blob: 627b6ccb1aabeb67fcfa6e173dc2aa9c434a8df5 [file] [log] [blame]
use clap::Parser;
use iggy::client::Client;
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
use iggy::models::messages::PolledMessage;
use iggy_examples::shared::args::Args;
use iggy_examples::shared::system;
use std::error::Error;
use std::sync::Arc;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Registry};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::parse();
Registry::default()
.with(tracing_subscriber::fmt::layer())
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
.init();
info!(
"Basic consumer has started, selected transport: {}",
args.transport
);
let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?);
let client = client_provider::get_raw_client(client_provider_config, false).await?;
let client = IggyClient::new(client);
client.connect().await?;
system::init_by_consumer(&args, &client).await;
system::consume_messages(&args, &client, &handle_message).await
}
fn handle_message(message: &PolledMessage) -> Result<(), Box<dyn Error>> {
// The payload can be of any type as it is a raw byte array. In this case it's a simple string.
let payload = std::str::from_utf8(&message.payload)?;
info!(
"Handling message at offset: {}, payload: {}...",
message.offset, payload
);
Ok(())
}