blob: a8155dc41e035705a5ca0e012350120fcc20e054 [file] [log] [blame]
use crate::cli_command::{CliCommand, PRINT_TARGET};
use crate::client::Client;
use crate::identifier::Identifier;
use crate::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use anyhow::Context;
use async_trait::async_trait;
use std::io::{self, Read};
use std::vec::Vec;
use tracing::{event, Level};
pub struct SendMessagesCmd {
stream_id: Identifier,
topic_id: Identifier,
partitioning: Partitioning,
messages: Option<Vec<String>>,
}
impl SendMessagesCmd {
pub fn new(
stream_id: Identifier,
topic_id: Identifier,
partition_id: Option<u32>,
message_key: Option<String>,
messages: Option<Vec<String>>,
) -> Self {
let partitioning = match (partition_id, message_key) {
(Some(_), Some(_)) => unreachable!(),
(Some(partition_id), None) => Partitioning::partition_id(partition_id),
(None, Some(message_key)) => Partitioning::messages_key_str(message_key.as_str())
.unwrap_or_else(|_| {
panic!(
"Failed to create Partitioning with {} string message key",
message_key
)
}),
(None, None) => Partitioning::default(),
};
Self {
stream_id,
topic_id,
partitioning,
messages,
}
}
fn read_message_from_stdin(&self) -> Result<String, io::Error> {
let mut buffer = String::new();
io::stdin().read_to_string(&mut buffer)?;
Ok(buffer)
}
}
#[async_trait]
impl CliCommand for SendMessagesCmd {
fn explain(&self) -> String {
format!(
"send messages to topic with ID: {} and stream with ID: {}",
self.topic_id, self.stream_id
)
}
async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> {
let messages = match &self.messages {
Some(messages) => messages
.iter()
.map(|s| AppendableMessage::new(None, s.clone().into(), None))
.collect::<Vec<_>>(),
None => {
let input = self.read_message_from_stdin()?;
input
.lines()
.map(|m| AppendableMessage::new(None, String::from(m).into(), None))
.collect()
}
};
client
.send_messages(&mut AppendMessages {
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
partitioning: self.partitioning.clone(),
messages,
})
.await
.with_context(|| {
format!(
"Problem sending messages to topic with ID: {} and stream with ID: {}",
self.topic_id, self.stream_id
)
})?;
event!(target: PRINT_TARGET, Level::INFO,
"Sent messages to topic with ID: {} and stream with ID: {}",
self.topic_id,
self.stream_id,
);
Ok(())
}
}