Pulsar Python client library

Pulsar Python client library is based on the existing C++ client library. All the same features are exposed through Python interface.

Install

First follow the instructions to compile the Pulsar C++ client library. That will also build the Python binding for the library.

Currently the only supported Python version is 2.7.

To install the Python bindings:

$ cd pulsar-client-cpp/python
$ sudo python setup.py install

Examples

Producer example

import pulsar

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer(
                'persistent://sample/standalone/ns/my-topic')

for i in range(10):
    producer.send('Hello-%d' % i)

client.close()
Consumer Example
import pulsar

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe(
        'persistent://sample/standalone/ns/my-topic',
        'my-sub')

while True:
    msg = consumer.receive()
    print("Received message '%s' id='%s'", msg.data(), msg.message_id())
    consumer.acknowledge(msg)

client.close()

Async Producer example

import pulsar

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer(
                'persistent://sample/standalone/ns/my-topic',
                block_if_queue_full=True,
                batching_enabled=True,
                batching_max_publish_delay_ms=10
            )

def send_callback(res, msg):
    print('Message published res=%s', res)

while True:
    producer.send_async('Hello-%d' % i, send_callback)

client.close()

PyDoc API Reference

CLASSES
    Authentication
    Client
    Consumer
    Producer

    class Authentication
     |  Authentication provider object
     |  
     |  Methods defined here:
     |  
     |  __init__(self, dynamicLibPath, authParamsString)
     |      Create the authentication provider instance
     |      
     |      -- Args --
     |      dynamicLibPath   -- Path to the authentication provider shared library (eg: 'tls.so')
     |      authParamsString -- comma separated list of provider specific configuration params

    class Client
     |  Pulsar client.
     |  
     |  A single client instance can be used to create producers and consumers on
     |  multiple topics.
     |  
     |  The client will share the same connection pool and threads across all the
     |  producers and consumers.
     |  
     |  Methods defined here:
     |  
     |  __init__(self, service_url, authentication=None, operation_timeout_seconds=30,
     |      io_threads=1, message_listener_threads=1, concurrent_lookup_requests=5000,
     |      log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None,
     |      tls_allow_insecure_connection=False)
     |      Create a new Pulsar client instance
     |      
     |      -- Args --
     |      service_url  -- The Pulsar service url eg: pulsar://my-broker.com:6650/
     |      
     |      -- Options --
     |      authentication                -- Set the authentication provider to be used with the broker
     |      operation_timeout_seconds     -- Set timeout on client operations (subscribe, create
     |                                       producer, close, unsubscribe)
     |      io_threads                    -- Set the number of IO threads to be used by the Pulsar client.
     |      message_listener_threads      -- Set the number of threads to be used by the Pulsar client
     |                                       when delivering messages through message listener. Default
     |                                       is 1 thread per Pulsar client.
     |                                       If using more than 1 thread, messages for distinct
     |                                       message_listener will be delivered in different threads,
     |                                       however a single MessageListener will always be assigned to
     |                                       the same thread.
     |      concurrent_lookup_requests    -- Number of concurrent lookup-requests allowed on each
     |                                       broker-connection to prevent overload on broker.
     |      log_conf_file_path            -- Initialize log4cxx from a conf file
     |      use_tls                       -- configure whether to use TLS encryption on the connection
     |      tls_trust_certs_file_path     -- Set the path to the trusted TLS certificate file
     |      tls_allow_insecure_connection -- Configure whether the Pulsar client accept untrusted TLS
     |                                       certificate from broker
     |  
     |  close(self)
     |      Close the client and all the associated producers and consumers
     |  
     |  create_producer(self, topic, send_timeout_millis=30000,
     |                  compression_type=_pulsar.CompressionType.None, max_pending_messages=30000,
     |                  block_if_queue_full=False, batching_enabled=False, batching_max_messages=1000,
     |                  batching_max_allowed_size_in_bytes=131072, batching_max_publish_delay_ms=10)
     |      Create a new producer on a given topic
     |      
     |      -- Args --
     |      topic                -- the topic name
     |      
     |      -- Options --
     |      send_timeout_seconds -- If a message is not acknowledged by the server before the
     |                              send_timeout expires, an error will be reported
     |      compression_type     -- Set the compression type for the producer. By default, message
     |                              payloads are not compressed. Supported compression types are:
     |                              CompressionType.LZ4 and CompressionType.ZLib
     |      max_pending_messages -- Set the max size of the queue holding the messages pending to
     |                              receive an acknowledgment from the broker.
     |      block_if_queue_full  -- Set whether send_async} operations should block when the outgoing
     |                              message queue is full.
     |      message_routing_mode -- Set the message routing mode for the partitioned producer
     |      
     |      return a new Producer object
     |  
     |  subscribe(self, topic, subscription_name,
     |            consumer_type=_pulsar.ConsumerType.Exclusive, message_listener=None,
     |            receiver_queue_size=1000, consumer_name=None,
     |            unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000)
     |      Subscribe to the given topic and subscription combination
     |      
     |      -- Args --
     |      topic        -- The name of the topic
     |      subscription -- The name of the subscription
     |      
     |      -- Options --
     |      consumer_type       -- Select the subscription type to be used when subscribing to the topic.
     |      message_listener    -- Sets a message listener for the consumer. When the listener is set,
     |                             application will receive messages through it. Calls to
     |                             consumer.receive() will not be allowed.
     |      receiver_queue_size -- Sets the size of the consumer receive queue. The consumer receive
     |                              ueue controls how many messages can be accumulated by the Consumer
     |                              before the application calls receive(). Using a higher value could
     |                              potentially increase the consumer throughput at the expense of bigger
     |                              memory utilization.
     |      
     |                              Setting the consumer queue size as zero decreases the throughput of
     |                              the consumer, by disabling pre-fetching of messages. This approach
     |                              improves the message distribution on shared subscription, by pushing
     |                              messages only to the consumers that are ready to process them.
     |                              Neither receive with timeout nor Partitioned Topics can be used if
     |                              the consumer queue size is zero. The receive() function call should
     |                              not be interrupted when the consumer queue size is zero. Default
     |                              value is 1000 messages and should be good for most use cases.
     |      consumer_name        -- Sets the consumer name
     |      unacked_messages_timeout_ms  -- Set the timeout in milliseconds for unacknowledged messages,
     |                                      the timeout needs to be greater than 10 seconds. An
     |                                      Exception is thrown if the given value is less than 10 seconds
     |                                      If a successful acknowledgement is not sent within the
     |                                      timeout all the unacknowledged messages are redelivered.
     |      broker_consumer_stats_cache_time_ms -- Set the time duration for which the broker side
     |                                             consumer stats will be cached in the client.
     |      
     |      return a new Consumer object

    class Consumer
     |  Pulsar consumer
     |  
     |  Methods defined here:
     |  
     |  acknowledge(self, message)
     |      Acknowledge the reception of a single message.
     |      
     |      This method will block until an acknowledgement is sent to the broker.
     |      After that, the message will not be re-delivered to this consumer.
     |      
     |      message -- The received message or message id
     |  
     |  acknowledge_cumulative(self, message)
     |      Acknowledge the reception of all the messages in the stream up to (and
     |      including) the provided message.
     |      
     |      This method will block until an acknowledgement is sent to the broker.
     |      After that, the messages will not be re-delivered to this consumer.
     |      
     |      message -- The received message or message id
     |  
     |  close(self)
     |      Close the consumer
     |  
     |  pause_message_listener(self)
     |      Pause receiving messages via the message_listener, until
     |      resume_message_listener() is called.
     |  
     |  receive(self, timeout_millis=None)
     |      Receive a single message.
     |      
     |      If a message is not immediately available, this method will block until
     |      a new message is available.
     |      
     |      timeout_millis -- If specified, the receive will raise an exception if
     |                        a message is not availble withing the timeout
     |  
     |  redeliver_unacknowledged_messages(self)
     |      Redelivers all the unacknowledged messages. In Failover mode, the request
     |      is ignored if the consumer is not active for the given topic. In Shared
     |      mode, the consumers messages to be redelivered are distributed across all
     |      the connected consumers. This is a non blocking call and doesn't throw an
     |      exception. In case the connection breaks, the messages are redelivered
     |      after reconnect.
     |  
     |  resume_message_listener(self)
     |      Resume receiving the messages via the messageListener.
     |      Asynchronously receive all the messages enqueued from time
     |      pause_message_listener() was called.
     |  
     |  subscription_name(self)
     |      return the subscription name
     |  
     |  topic(self)
     |      return the topic this consumer is subscribed to
     |  
     |  unsubscribe(self)
     |      Unsubscribe the current consumer from the topic.
     |      
     |      This method will block until the operation is completed. Once the consumer
     |      is unsubscribed, no more messages will be received and subsequent new
     |      messages will not be retained for this consumer.
     |      
     |      This consumer object cannot be reused.

 class Producer
      |  Producer object
      |  
      |  The producer is used to publish messages on a topic
      |  
      |  Methods defined here:
      |  
      |  close(self)
      |      Close the producer
      |  
      |  send(self, content, properties=None, partition_key=None, replication_clusters=None,
      |       disable_replication=False)
      |      Publish a message on the topic. Blocks until the message is acknowledged
      |      
      |      -- Args --
      |      content -- A string with the message payload
      |      
      |      -- Options --
      |      properties           -- dict of application defined string properties
      |      partition_key        -- set partition key for the message routing. Hash of this key is used to
      |                              determine message's destination partition
      |      replication_clusters -- override namespace replication clusters.  note that it is the
      |                              caller's responsibility to provide valid cluster names, and that
      |                              all clusters have been previously configured as destinations.
      |                              Given an empty list, the message will replicate per the namespace
      |                              configuration.
      |      disable_replication  -- Do not replicate this message
      |  
      |  send_async(self, content, callback, properties=None, partition_key=None,
      |             replication_clusters=None, disable_replication=False)
      |      Send a message asynchronously
      |      
      |      The `callback` will be invoked once the message has been acknowledged
      |      by the broker.
      |      
      |      Example:
      |      def callback(res, msg):
      |          print 'Message published:', res
      |      
      |      producer.send_async(msg, callback)
      |      
      |      When the producer queue is full, by default the message will be rejected
      |      and the callback invoked with an error code.
      |      
      |      -- Args --
      |      content -- A string with the message payload
      |      
      |      -- Options --
      |      properties           -- dict of application defined string properties
      |      partition_key        -- set partition key for the message routing. Hash of this key is used to
      |                              determine message's destination partition
      |      replication_clusters -- override namespace replication clusters.  note that it is the
      |                              caller's responsibility to provide valid cluster names, and that
      |                              all clusters have been previously configured as destinations.
      |                              Given an empty list, the message will replicate per the namespace
      |                              configuration.
      |      disable_replication  -- Do not replicate this message