blob: 0e2da92801787fed68d4023a5b97ae76a06d6c37 [file] [log] [blame]
use anyhow::Result;
use bytes::Bytes;
use clap::Parser;
use iggy::client::MessageClient;
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
use iggy::identifier::Identifier;
use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy::models::header::{HeaderKey, HeaderValue};
use iggy_examples::shared::args::Args;
use iggy_examples::shared::messages_generator::MessagesGenerator;
use iggy_examples::shared::system;
use std::collections::HashMap;
use std::error::Error;
use std::str::FromStr;
use std::sync::Arc;
use tracing::info;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::parse();
tracing_subscriber::fmt::init();
info!(
"Message headers producer has started, selected transport: {}",
args.transport
);
let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?);
let client = client_provider::get_raw_client(client_provider_config).await?;
let client = IggyClient::builder(client).build();
system::login_root(&client).await;
system::init_by_producer(&args, &client).await?;
produce_messages(&args, &client).await
}
async fn produce_messages(args: &Args, client: &IggyClient) -> Result<(), Box<dyn Error>> {
info!(
"Messages will be sent to stream: {}, topic: {}, partition: {} with interval {} ms.",
args.stream_id, args.topic_id, args.partition_id, args.interval
);
let mut interval = tokio::time::interval(std::time::Duration::from_millis(args.interval));
let mut message_generator = MessagesGenerator::new();
let mut sent_batches = 0;
loop {
if args.message_batches_limit > 0 && sent_batches == args.message_batches_limit {
info!("Sent {sent_batches} batches of messages, exiting.");
return Ok(());
}
let mut messages = Vec::new();
let mut serializable_messages = Vec::new();
for _ in 0..args.messages_per_batch {
let serializable_message = message_generator.generate();
// You can send the different message types to the same partition, or stick to the single type.
let message_type = serializable_message.get_message_type();
let json = serializable_message.to_json();
// The message type will be stored in the custom message header.
let mut headers = HashMap::new();
headers.insert(
HeaderKey::new("message_type").unwrap(),
HeaderValue::from_str(message_type).unwrap(),
);
let message = AppendableMessage::new(None, Bytes::from(json), Some(headers));
messages.push(message);
// This is used for the logging purposes only.
serializable_messages.push(serializable_message);
}
client
.send_messages(&mut AppendMessages {
stream_id: Identifier::numeric(args.stream_id)?,
topic_id: Identifier::numeric(args.topic_id)?,
partitioning: Partitioning::partition_id(args.partition_id),
messages,
})
.await?;
sent_batches += 1;
info!("Sent messages: {:#?}", serializable_messages);
interval.tick().await;
}
}