blob: 4f75c6775c7c10911df9ae5bed9abd070a03f9f1 [file] [log] [blame]
use crate::client::TopicClient;
use crate::compression::compression_algorithm::CompressionAlgorithm;
use crate::error::IggyError;
use crate::http::client::HttpClient;
use crate::http::HttpTransport;
use crate::identifier::Identifier;
use crate::models::topic::{Topic, TopicDetails};
use crate::topics::create_topic::CreateTopic;
use crate::topics::update_topic::UpdateTopic;
use crate::utils::expiry::IggyExpiry;
use crate::utils::topic_size::MaxTopicSize;
use async_trait::async_trait;
#[async_trait]
impl TopicClient for HttpClient {
async fn get_topic(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<Option<TopicDetails>, IggyError> {
let response = self
.get(&get_details_path(
&stream_id.as_cow_str(),
&topic_id.as_cow_str(),
))
.await;
if let Err(error) = response {
if matches!(error, IggyError::ResourceNotFound(_)) {
return Ok(None);
}
return Err(error);
}
let topic = response?
.json()
.await
.map_err(|_| IggyError::InvalidJsonResponse)?;
Ok(Some(topic))
}
async fn get_topics(&self, stream_id: &Identifier) -> Result<Vec<Topic>, IggyError> {
let response = self.get(&get_path(&stream_id.as_cow_str())).await?;
let topics = response
.json()
.await
.map_err(|_| IggyError::InvalidJsonResponse)?;
Ok(topics)
}
async fn create_topic(
&self,
stream_id: &Identifier,
name: &str,
partitions_count: u32,
compression_algorithm: CompressionAlgorithm,
replication_factor: Option<u8>,
topic_id: Option<u32>,
message_expiry: IggyExpiry,
max_topic_size: MaxTopicSize,
) -> Result<TopicDetails, IggyError> {
let response = self
.post(
&get_path(&stream_id.as_cow_str()),
&CreateTopic {
stream_id: stream_id.clone(),
name: name.to_string(),
partitions_count,
compression_algorithm,
replication_factor,
topic_id,
message_expiry,
max_topic_size,
},
)
.await?;
let topic = response
.json()
.await
.map_err(|_| IggyError::InvalidJsonResponse)?;
Ok(topic)
}
async fn update_topic(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
name: &str,
compression_algorithm: CompressionAlgorithm,
replication_factor: Option<u8>,
message_expiry: IggyExpiry,
max_topic_size: MaxTopicSize,
) -> Result<(), IggyError> {
self.put(
&get_details_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()),
&UpdateTopic {
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
name: name.to_string(),
compression_algorithm,
replication_factor,
message_expiry,
max_topic_size,
},
)
.await?;
Ok(())
}
async fn delete_topic(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<(), IggyError> {
self.delete(&get_details_path(
&stream_id.as_cow_str(),
&topic_id.as_cow_str(),
))
.await?;
Ok(())
}
async fn purge_topic(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<(), IggyError> {
self.delete(&format!(
"{}/purge",
&get_details_path(&stream_id.as_cow_str(), &topic_id.as_cow_str(),)
))
.await?;
Ok(())
}
}
fn get_path(stream_id: &str) -> String {
format!("streams/{stream_id}/topics")
}
fn get_details_path(stream_id: &str, topic_id: &str) -> String {
format!("{}/{topic_id}", get_path(stream_id))
}