The following example creates a Python producer for the my-topic
topic and sends 10 messages on that topic:
import pulsar client = pulsar.Client('pulsar://localhost:6650') producer = client.create_producer('my-topic') for i in range(10): producer.send(('Hello-%d' % i).encode('utf-8')) client.close()
The following example creates a consumer with the my-subscription
subscription name on the my-topic
topic, receives incoming messages, prints the content and ID of messages that arrive, and acknowledges each message to the Pulsar broker.
import pulsar client = pulsar.Client('pulsar://localhost:6650') consumer = client.subscribe('my-topic', 'my-subscription') while True: msg = consumer.receive() try: print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # Acknowledge successful processing of the message consumer.acknowledge(msg) except Exception: # Message failed to be processed consumer.negative_acknowledge(msg) client.close()
This example shows how to configure negative acknowledgment.
from pulsar import Client, schema client = Client('pulsar://localhost:6650') consumer = client.subscribe('negative_acks','test',schema=schema.StringSchema()) producer = client.create_producer('negative_acks',schema=schema.StringSchema()) for i in range(10): print('send msg "hello-%d"' % i) producer.send_async('hello-%d' % i, callback=None) producer.flush() for i in range(10): msg = consumer.receive() consumer.negative_acknowledge(msg) print('receive and nack msg "%s"' % msg.data()) for i in range(10): msg = consumer.receive() consumer.acknowledge(msg) print('receive and ack msg "%s"' % msg.data()) try: # No more messages expected msg = consumer.receive(100) except: print("no more msg") pass
You can use the Pulsar Python API to use the Pulsar reader interface. Here's an example:
# MessageId taken from a previously fetched message msg_id = msg.message_id() reader = client.create_reader('my-topic', msg_id) while True: msg = reader.read_next() print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # No acknowledgment
You can use different built-in schema types in Pulsar. All the definitions are in the pulsar.schema
package.
Schema | Notes |
---|---|
BytesSchema | Get the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode |
StringSchema | Encode/decode payload as a UTF-8 string. Uses str objects |
JsonSchema | Require record definition. Serializes the record into standard JSON payload |
AvroSchema | Require record definition. Serializes in AVRO format |
The schema definition is done through a class that inherits from pulsar.schema.Record
.
This class has a number of fields that can be of either pulsar.schema.Field
type or another nested Record
. All the fields are specified in the pulsar.schema
package. The fields are matching the AVRO field types.
Field Type | Python Type | Notes |
---|---|---|
Boolean | bool | |
Integer | int | |
Long | int | |
Float | float | |
Double | float | |
Bytes | bytes | |
String | str | |
Array | list | Need to specify record type for items. |
Map | dict | Key is always String . Need to specify value type. |
Additionally, any Python Enum
type can be used as a valid field type.
When adding a field, you can use these parameters in the constructor.
Argument | Default | Notes |
---|---|---|
default | None | Set a default value for the field, such as a = Integer(default=5) . |
required | False | Mark the field as “required”. It is set in the schema accordingly. |
class Example(Record): a = String() b = Integer() c = Array(String()) i = Map(String())
from enum import Enum class Color(Enum): red = 1 green = 2 blue = 3 class Example(Record): name = String() color = Color
class MySubRecord(Record): x = Integer() y = Long() z = String() class Example(Record): a = String() sub = MySubRecord()
Set the namespace for the Avro Record schema using the special field _avro_namespace
.
class NamespaceDemo(Record): _avro_namespace = 'xxx.xxx.xxx' x = String() y = Integer()
The schema definition is like this.
{ "name": "NamespaceDemo", "namespace": "xxx.xxx.xxx", "type": "record", "fields": [ {"name": "x", "type": ["null", "string"]}, {"name": "y", "type": ["null", "int"]} ] }
Before the producer is created, the Pulsar broker validates that the existing topic schema is the correct type and that the format is compatible with the schema definition of a class. If the format of the topic schema is incompatible with the schema definition, an exception occurs in the producer creation.
Once a producer is created with a certain schema definition, it only accepts objects that are instances of the declared schema class.
Similarly, for a consumer or reader, the consumer returns an object (which is an instance of the schema record class) rather than raw bytes.
Example
consumer = client.subscribe( topic='my-topic', subscription_name='my-subscription', schema=AvroSchema(Example) ) while True: msg = consumer.receive() ex = msg.value() try: print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c)) # Acknowledge successful processing of the message consumer.acknowledge(msg) except Exception: # Message failed to be processed consumer.negative_acknowledge(msg)
For more code examples, see Schema - Get started.