blob: 6d2ad4fafc5c0e99a3f66d8f2c58df51aef5469b [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use std::sync::Arc;
use std::time::Duration;
use iggy::consumer_ext::{IggyConsumerMessageExt, MessageConsumer};
use iggy::prelude::{
AutoCommit as RustAutoCommit, AutoCommitAfter as RustAutoCommitAfter,
AutoCommitWhen as RustAutoCommitWhen, *,
};
use iggy::prelude::{IggyConsumer as RustIggyConsumer, IggyError, ReceivedMessage};
use pyo3::types::{PyDelta, PyDeltaAccess};
use pyo3::prelude::*;
use pyo3_async_runtimes::tokio::{future_into_py, get_runtime, into_future, scope};
use pyo3_async_runtimes::TaskLocals;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pyclass_complex_enum, gen_stub_pymethods};
use tokio::sync::oneshot::Sender;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use crate::identifier::PyIdentifier;
use crate::iterator::ReceiveMessageIterator;
use crate::receive_message::ReceiveMessage;
/// A Python class representing the Iggy consumer.
/// It wraps the RustIggyConsumer and provides asynchronous functionality
/// through the contained runtime.
#[gen_stub_pyclass]
#[pyclass]
pub struct IggyConsumer {
pub(crate) inner: Arc<Mutex<RustIggyConsumer>>,
}
#[gen_stub_pymethods]
#[pymethods]
impl IggyConsumer {
/// Get the last consumed offset or `None` if no offset has been consumed yet.
fn get_last_consumed_offset(&self, partition_id: u32) -> Option<u64> {
self.inner
.blocking_lock()
.get_last_consumed_offset(partition_id)
}
/// Get the last stored offset or `None` if no offset has been stored yet.
fn get_last_stored_offset(&self, partition_id: u32) -> Option<u64> {
self.inner
.blocking_lock()
.get_last_stored_offset(partition_id)
}
/// Gets the name of the consumer group.
fn name(&self) -> String {
self.inner.blocking_lock().name().to_string()
}
/// Gets the current partition id or `0` if no messages have been polled yet.
fn partition_id(&self) -> u32 {
self.inner.blocking_lock().partition_id()
}
/// Gets the name of the stream this consumer group is configured for.
fn stream(&self) -> PyIdentifier {
self.inner.blocking_lock().stream().into()
}
/// Gets the name of the topic this consumer group is configured for.
fn topic(&self) -> PyIdentifier {
self.inner.blocking_lock().topic().into()
}
/// Stores the provided offset for the provided partition id or if none is specified
/// uses the current partition id for the consumer group.
///
/// Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError`
/// if the operation fails.
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))]
fn store_offset<'a>(
&self,
py: Python<'a>,
offset: u64,
partition_id: Option<u32>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self.inner.clone();
future_into_py(py, async move {
inner
.lock()
.await
.store_offset(offset, partition_id)
.await
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e:?}")))
})
}
/// Deletes the offset for the provided partition id or if none is specified
/// uses the current partition id for the consumer group.
///
/// Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError`
/// if the operation fails.
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))]
fn delete_offset<'a>(
&self,
py: Python<'a>,
partition_id: Option<u32>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self.inner.clone();
future_into_py(py, async move {
inner
.lock()
.await
.delete_offset(partition_id)
.await
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e:?}")))
})
}
/// Asynchronously iterate over `ReceiveMessage`s.
///
/// Returns an async iterator that raises `StopAsyncIteration` when no more messages are available
/// or a `PyRuntimeError` on failure.
///
/// Note: This method does not currently support `AutoCommit.After`.
/// For `AutoCommit.IntervalOrAfter(datetime.timedelta, AutoCommitAfter)`,
/// only the interval part is applied; the `after` mode is ignored.
/// Use `consume_messages()` if you need commit-after-processing semantics.
#[gen_stub(override_return_type(type_repr="collections.abc.AsyncIterator[ReceiveMessage]", imports=("collections.abc")))]
fn iter_messages(&self) -> ReceiveMessageIterator {
let inner = self.inner.clone();
ReceiveMessageIterator { inner }
}
/// Consumes messages continuously using a callback function and an optional `asyncio.Event` for signaling shutdown.
///
/// Returns an awaitable that completes when shutdown is signaled or a PyRuntimeError on failure.
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))]
fn consume_messages<'a>(
&self,
py: Python<'a>,
#[gen_stub(override_type(type_repr="collections.abc.Callable[[ReceiveMessage], collections.abc.Awaitable[None]]", imports=("collections.abc")))]
callback: Bound<'a, PyAny>,
#[gen_stub(override_type(type_repr="typing.Optional[asyncio.Event]", imports=("asyncio")))]
shutdown_event: Option<Bound<'a, PyAny>>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self.inner.clone();
let callback: Py<PyAny> = callback.unbind();
let shutdown_event: Option<Py<PyAny>> = shutdown_event.map(|e| e.unbind());
future_into_py(py, async {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let task_locals = Python::attach(pyo3_async_runtimes::tokio::get_current_locals)?;
let handle_consume = get_runtime().spawn(scope(task_locals, async move {
let task_locals =
Python::attach(pyo3_async_runtimes::tokio::get_current_locals).unwrap();
let consumer = PyCallbackConsumer {
callback: Arc::new(callback),
task_locals: Arc::new(Mutex::new(task_locals)),
};
let mut inner = inner.lock().await;
inner.consume_messages(&consumer, shutdown_rx).await
}));
let consume_result;
if let Some(shutdown_event) = shutdown_event {
let task_locals = Python::attach(pyo3_async_runtimes::tokio::get_current_locals)?;
async fn shutdown_impl(
shutdown_event: Py<PyAny>,
shutdown_tx: Sender<()>,
) -> PyResult<()> {
Python::attach(|py| {
into_future(
shutdown_event
.bind(py)
.as_any()
.call_method0("wait")
.unwrap(),
)
})?
.await?;
shutdown_tx.send(()).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e:?}"))
})?;
Ok(())
}
let handle_shutdown: JoinHandle<Result<(), PyErr>> = get_runtime().spawn(scope(
task_locals,
shutdown_impl(shutdown_event, shutdown_tx),
));
let shutdown_result;
(consume_result, shutdown_result) = tokio::join!(handle_consume, handle_shutdown);
shutdown_result.unwrap()?;
} else {
consume_result = handle_consume.await;
}
consume_result
.unwrap()
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e:?}")))?;
Ok(())
})
}
}
struct PyCallbackConsumer {
callback: Arc<Py<PyAny>>,
task_locals: Arc<Mutex<TaskLocals>>,
}
impl MessageConsumer for PyCallbackConsumer {
async fn consume(&self, received: ReceivedMessage) -> Result<(), IggyError> {
let callback = self.callback.clone();
let task_locals = self.task_locals.clone().lock_owned().await;
let task_locals = task_locals.clone();
let message = ReceiveMessage {
inner: received.message,
partition_id: received.partition_id,
};
get_runtime()
.spawn(scope(task_locals, async move {
Python::attach(|py| {
let callback = callback.bind(py);
let result = callback.as_any().call1((message,))?;
into_future(result)
})
}))
.await
.map_err(|_| IggyError::CannotReadMessage)?
.map_err(|_| IggyError::CannotReadMessage)?
.await
.map_err(|_| IggyError::CannotReadMessage)?;
Ok(())
}
}
/// The auto-commit configuration for storing the offset on the server.
// #[derive(Debug, PartialEq, Copy, Clone)]
#[gen_stub_pyclass_complex_enum]
#[pyclass]
pub enum AutoCommit {
/// The auto-commit is disabled and the offset must be stored manually by the consumer.
Disabled(),
/// The auto-commit is enabled and the offset is stored on the server after a certain interval.
Interval(Py<PyDelta>),
/// The auto-commit is enabled and the offset is stored on the server after a certain interval or depending on the mode when consuming the messages.
IntervalOrWhen(Py<PyDelta>, AutoCommitWhen),
/// The auto-commit is enabled and the offset is stored on the server after a certain interval or depending on the mode after consuming the messages.
IntervalOrAfter(Py<PyDelta>, AutoCommitAfter),
/// The auto-commit is enabled and the offset is stored on the server depending on the mode when consuming the messages.
When(AutoCommitWhen),
/// The auto-commit is enabled and the offset is stored on the server depending on the mode after consuming the messages.
After(AutoCommitAfter),
}
impl From<&AutoCommit> for RustAutoCommit {
fn from(val: &AutoCommit) -> RustAutoCommit {
match val {
AutoCommit::Disabled() => RustAutoCommit::Disabled,
AutoCommit::Interval(delta) => {
let duration = py_delta_to_iggy_duration(delta);
RustAutoCommit::Interval(duration)
}
AutoCommit::IntervalOrWhen(delta, when) => {
let duration = py_delta_to_iggy_duration(delta);
RustAutoCommit::IntervalOrWhen(duration, when.into())
}
AutoCommit::IntervalOrAfter(delta, after) => {
let duration = py_delta_to_iggy_duration(delta);
RustAutoCommit::IntervalOrAfter(duration, after.into())
}
AutoCommit::When(when) => RustAutoCommit::When(when.into()),
AutoCommit::After(after) => RustAutoCommit::After(after.into()),
}
}
}
/// The auto-commit mode for storing the offset on the server.
#[derive(Debug, PartialEq, Copy, Clone)]
#[gen_stub_pyclass_complex_enum]
#[pyclass]
pub enum AutoCommitWhen {
/// The offset is stored on the server when the messages are received.
PollingMessages(),
/// The offset is stored on the server when all the messages are consumed.
ConsumingAllMessages(),
/// The offset is stored on the server when consuming each message.
ConsumingEachMessage(),
/// The offset is stored on the server when consuming every Nth message.
ConsumingEveryNthMessage(u32),
}
impl From<&AutoCommitWhen> for RustAutoCommitWhen {
fn from(val: &AutoCommitWhen) -> RustAutoCommitWhen {
match val {
AutoCommitWhen::PollingMessages() => RustAutoCommitWhen::PollingMessages,
AutoCommitWhen::ConsumingAllMessages() => RustAutoCommitWhen::ConsumingAllMessages,
AutoCommitWhen::ConsumingEachMessage() => RustAutoCommitWhen::ConsumingEachMessage,
AutoCommitWhen::ConsumingEveryNthMessage(n) => {
RustAutoCommitWhen::ConsumingEveryNthMessage(n.to_owned())
}
}
}
}
/// The auto-commit mode for storing the offset on the server **after** receiving the messages.
#[derive(Debug, PartialEq, Copy, Clone)]
#[gen_stub_pyclass_complex_enum]
#[pyclass]
#[allow(clippy::enum_variant_names)]
pub enum AutoCommitAfter {
/// The offset is stored on the server after all the messages are consumed.
ConsumingAllMessages(),
/// The offset is stored on the server after consuming each message.
ConsumingEachMessage(),
/// The offset is stored on the server after consuming every Nth message.
ConsumingEveryNthMessage(u32),
}
impl From<&AutoCommitAfter> for RustAutoCommitAfter {
fn from(val: &AutoCommitAfter) -> RustAutoCommitAfter {
match val {
AutoCommitAfter::ConsumingAllMessages() => RustAutoCommitAfter::ConsumingAllMessages,
AutoCommitAfter::ConsumingEachMessage() => RustAutoCommitAfter::ConsumingEachMessage,
AutoCommitAfter::ConsumingEveryNthMessage(n) => {
RustAutoCommitAfter::ConsumingEveryNthMessage(n.to_owned())
}
}
}
}
pub fn py_delta_to_iggy_duration(delta1: &Py<PyDelta>) -> IggyDuration {
Python::attach(|py| {
let delta = delta1.bind(py);
let seconds = (delta.get_days() * 60 * 60 * 24 + delta.get_seconds()) as u64;
let nanos = (delta.get_microseconds() * 1_000) as u32;
IggyDuration::new(Duration::new(seconds, nanos))
})
}