blob: 457a707a271b2b144d49ef6482b1a73fc6258022 [file] [log] [blame]
use iggy::client::{Client, StreamClient, TopicClient, UserClient};
use iggy::clients::client::{IggyClient, IggyClientConfig};
use iggy::identifier::Identifier;
use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy::streams::create_stream::CreateStream;
use iggy::tcp::client::TcpClient;
use iggy::tcp::config::TcpClientConfig;
use iggy::topics::create_topic::CreateTopic;
use iggy::users::defaults::*;
use iggy::users::login_user::LoginUser;
use std::env;
use std::error::Error;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{info, warn};
const STREAM_ID: u32 = 1;
const TOPIC_ID: u32 = 1;
const PARTITION_ID: u32 = 1;
const BATCHES_LIMIT: u32 = 5;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt::init();
let tcp_client_config = TcpClientConfig {
server_address: get_tcp_server_addr(),
..TcpClientConfig::default()
};
let tcp_client = Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
let client = IggyClient::create(tcp_client, IggyClientConfig::default(), None, None, None);
// Or, instead of above lines, you can just use below code, which will create a Iggy
// TCP client with default config (default server address for TCP is 127.0.0.1:8090):
// let client = IggyClient::default();
client.connect().await?;
client
.login_user(&LoginUser {
username: DEFAULT_ROOT_USERNAME.to_string(),
password: DEFAULT_ROOT_PASSWORD.to_string(),
})
.await?;
init_system(&client).await;
produce_messages(&client).await
}
async fn init_system(client: &IggyClient) {
match client
.create_stream(&CreateStream {
stream_id: Some(STREAM_ID),
name: "sample-stream".to_string(),
})
.await
{
Ok(_) => info!("Stream was created."),
Err(_) => warn!("Stream already exists and will not be created again."),
}
match client
.create_topic(&CreateTopic {
stream_id: Identifier::numeric(STREAM_ID).unwrap(),
topic_id: Some(TOPIC_ID),
partitions_count: 1,
name: "sample-topic".to_string(),
message_expiry: None,
max_topic_size: None,
replication_factor: 1,
})
.await
{
Ok(_) => info!("Topic was created."),
Err(_) => warn!("Topic already exists and will not be created again."),
}
}
async fn produce_messages(client: &dyn Client) -> Result<(), Box<dyn Error>> {
let interval = Duration::from_millis(500);
info!(
"Messages will be sent to stream: {}, topic: {}, partition: {} with interval {} ms.",
STREAM_ID,
TOPIC_ID,
PARTITION_ID,
interval.as_millis()
);
let mut current_id = 0;
let messages_per_batch = 10;
let mut sent_batches = 0;
loop {
if sent_batches == BATCHES_LIMIT {
info!("Sent {sent_batches} batches of messages, exiting.");
return Ok(());
}
let mut messages = Vec::new();
for _ in 0..messages_per_batch {
current_id += 1;
let payload = format!("message-{current_id}");
let message = AppendableMessage::from_str(&payload)?;
messages.push(message);
}
client
.send_messages(&mut AppendMessages {
stream_id: Identifier::numeric(STREAM_ID)?,
topic_id: Identifier::numeric(TOPIC_ID)?,
partitioning: Partitioning::partition_id(PARTITION_ID),
messages,
})
.await?;
sent_batches += 1;
info!("Sent {messages_per_batch} message(s).");
sleep(interval).await;
}
}
fn get_tcp_server_addr() -> String {
let default_server_addr = "127.0.0.1:8090".to_string();
let argument_name = env::args().nth(1);
let tcp_server_addr = env::args().nth(2);
if argument_name.is_none() && tcp_server_addr.is_none() {
default_server_addr
} else {
let argument_name = argument_name.unwrap();
if argument_name != "--tcp-server-address" {
panic!(
"Invalid argument {}! Usage: {} --tcp-server-address <server-address>",
argument_name,
env::args().next().unwrap()
);
}
let tcp_server_addr = tcp_server_addr.unwrap();
if tcp_server_addr.parse::<std::net::SocketAddr>().is_err() {
panic!(
"Invalid server address {}! Usage: {} --tcp-server-address <server-address>",
tcp_server_addr,
env::args().next().unwrap()
);
}
info!("Using server address: {}", tcp_server_addr);
tcp_server_addr
}
}