blob: 65ed8520679a52ab7b915ecfb343bab6b62314b6 [file] [log] [blame]
use crate::binary::binary_client::BinaryClient;
use crate::binary::{fail_if_not_authenticated, mapper};
use crate::client::MessageClient;
use crate::command::{POLL_MESSAGES_CODE, SEND_MESSAGES_CODE};
use crate::consumer::Consumer;
use crate::error::IggyError;
use crate::identifier::Identifier;
use crate::messages::poll_messages::PollingStrategy;
use crate::messages::send_messages::{Message, Partitioning};
use crate::messages::{poll_messages, send_messages};
use crate::models::messages::PolledMessages;
#[async_trait::async_trait]
impl<B: BinaryClient> MessageClient for B {
async fn poll_messages(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: Option<u32>,
consumer: &Consumer,
strategy: &PollingStrategy,
count: u32,
auto_commit: bool,
) -> Result<PolledMessages, IggyError> {
fail_if_not_authenticated(self).await?;
let response = self
.send_raw_with_response(
POLL_MESSAGES_CODE,
poll_messages::as_bytes(
stream_id,
topic_id,
partition_id,
consumer,
strategy,
count,
auto_commit,
),
)
.await?;
mapper::map_polled_messages(response)
}
async fn send_messages(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partitioning: &Partitioning,
messages: &mut [Message],
) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_raw_with_response(
SEND_MESSAGES_CODE,
send_messages::as_bytes(stream_id, topic_id, partitioning, messages),
)
.await?;
Ok(())
}
}