| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| """ |
| The Pulsar Python client APIs that work with the asyncio module. |
| """ |
| |
| import asyncio |
| import functools |
| from typing import Any |
| |
| import _pulsar |
| import pulsar |
| |
| class PulsarException(BaseException): |
| """ |
| The exception that wraps the Pulsar error code |
| """ |
| |
| def __init__(self, result: pulsar.Result) -> None: |
| """ |
| Create the Pulsar exception. |
| |
| Parameters |
| ---------- |
| result: pulsar.Result |
| The error code of the underlying Pulsar APIs. |
| """ |
| self._result = result |
| |
| def error(self) -> pulsar.Result: |
| """ |
| Returns the Pulsar error code. |
| """ |
| return self._result |
| |
| def __str__(self): |
| """ |
| Convert the exception to string. |
| """ |
| return f'{self._result.value} {self._result.name}' |
| |
| class Producer: |
| """ |
| The Pulsar message producer, used to publish messages on a topic. |
| """ |
| |
| def __init__(self, producer: _pulsar.Producer) -> None: |
| """ |
| Create the producer. |
| Users should not call this constructor directly. Instead, create the |
| producer via `Client.create_producer`. |
| |
| Parameters |
| ---------- |
| producer: _pulsar.Producer |
| The underlying Producer object from the C extension. |
| """ |
| self._producer: _pulsar.Producer = producer |
| |
| async def send(self, content: bytes) -> pulsar.MessageId: |
| """ |
| Send a message asynchronously. |
| |
| parameters |
| ---------- |
| content: bytes |
| The message payload |
| |
| Returns |
| ------- |
| pulsar.MessageId |
| The message id that represents the persisted position of the message. |
| |
| Raises |
| ------ |
| PulsarException |
| """ |
| builder = _pulsar.MessageBuilder() |
| builder.content(content) |
| future = asyncio.get_running_loop().create_future() |
| self._producer.send_async(builder.build(), functools.partial(_set_future, future)) |
| msg_id = await future |
| return pulsar.MessageId( |
| msg_id.partition(), |
| msg_id.ledger_id(), |
| msg_id.entry_id(), |
| msg_id.batch_index(), |
| ) |
| |
| async def close(self) -> None: |
| """ |
| Close the producer. |
| |
| Raises |
| ------ |
| PulsarException |
| """ |
| future = asyncio.get_running_loop().create_future() |
| self._producer.close_async(functools.partial(_set_future, future, value=None)) |
| await future |
| |
| class Client: |
| """ |
| The asynchronous version of `pulsar.Client`. |
| """ |
| |
| def __init__(self, service_url, **kwargs) -> None: |
| """ |
| See `pulsar.Client.__init__` |
| """ |
| self._client: _pulsar.Client = pulsar.Client(service_url, **kwargs)._client |
| |
| async def create_producer(self, topic: str) -> Producer: |
| """ |
| Create a new producer on a given topic |
| |
| Parameters |
| ---------- |
| topic: str |
| The topic name |
| |
| Returns |
| ------- |
| Producer |
| The producer created |
| |
| Raises |
| ------ |
| PulsarException |
| """ |
| future = asyncio.get_running_loop().create_future() |
| conf = _pulsar.ProducerConfiguration() |
| # TODO: add more configs |
| self._client.create_producer_async(topic, conf, functools.partial(_set_future, future)) |
| return Producer(await future) |
| |
| async def close(self) -> None: |
| """ |
| Close the client and all the associated producers and consumers |
| |
| Raises |
| ------ |
| PulsarException |
| """ |
| future = asyncio.get_running_loop().create_future() |
| self._client.close_async(functools.partial(_set_future, future, value=None)) |
| await future |
| |
| def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any): |
| def complete(): |
| if result == _pulsar.Result.Ok: |
| future.set_result(value) |
| else: |
| future.set_exception(PulsarException(result)) |
| future.get_loop().call_soon_threadsafe(complete) |