blob: 445d47711fbb8d5d8128589bd03721c3cf934730 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
The Pulsar Python client APIs that work with the asyncio module.
"""
import asyncio
import functools
from typing import Any
import _pulsar
import pulsar
class PulsarException(BaseException):
"""
The exception that wraps the Pulsar error code
"""
def __init__(self, result: pulsar.Result) -> None:
"""
Create the Pulsar exception.
Parameters
----------
result: pulsar.Result
The error code of the underlying Pulsar APIs.
"""
self._result = result
def error(self) -> pulsar.Result:
"""
Returns the Pulsar error code.
"""
return self._result
def __str__(self):
"""
Convert the exception to string.
"""
return f'{self._result.value} {self._result.name}'
class Producer:
"""
The Pulsar message producer, used to publish messages on a topic.
"""
def __init__(self, producer: _pulsar.Producer) -> None:
"""
Create the producer.
Users should not call this constructor directly. Instead, create the
producer via `Client.create_producer`.
Parameters
----------
producer: _pulsar.Producer
The underlying Producer object from the C extension.
"""
self._producer: _pulsar.Producer = producer
async def send(self, content: bytes) -> pulsar.MessageId:
"""
Send a message asynchronously.
parameters
----------
content: bytes
The message payload
Returns
-------
pulsar.MessageId
The message id that represents the persisted position of the message.
Raises
------
PulsarException
"""
builder = _pulsar.MessageBuilder()
builder.content(content)
future = asyncio.get_running_loop().create_future()
self._producer.send_async(builder.build(), functools.partial(_set_future, future))
msg_id = await future
return pulsar.MessageId(
msg_id.partition(),
msg_id.ledger_id(),
msg_id.entry_id(),
msg_id.batch_index(),
)
async def close(self) -> None:
"""
Close the producer.
Raises
------
PulsarException
"""
future = asyncio.get_running_loop().create_future()
self._producer.close_async(functools.partial(_set_future, future, value=None))
await future
class Client:
"""
The asynchronous version of `pulsar.Client`.
"""
def __init__(self, service_url, **kwargs) -> None:
"""
See `pulsar.Client.__init__`
"""
self._client: _pulsar.Client = pulsar.Client(service_url, **kwargs)._client
async def create_producer(self, topic: str) -> Producer:
"""
Create a new producer on a given topic
Parameters
----------
topic: str
The topic name
Returns
-------
Producer
The producer created
Raises
------
PulsarException
"""
future = asyncio.get_running_loop().create_future()
conf = _pulsar.ProducerConfiguration()
# TODO: add more configs
self._client.create_producer_async(topic, conf, functools.partial(_set_future, future))
return Producer(await future)
async def close(self) -> None:
"""
Close the client and all the associated producers and consumers
Raises
------
PulsarException
"""
future = asyncio.get_running_loop().create_future()
self._client.close_async(functools.partial(_set_future, future, value=None))
await future
def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any):
def complete():
if result == _pulsar.Result.Ok:
future.set_result(value)
else:
future.set_exception(PulsarException(result))
future.get_loop().call_soon_threadsafe(complete)