| use clap::Parser; |
| use iggy::client::Client; |
| use iggy::client_provider; |
| use iggy::client_provider::ClientProviderConfig; |
| use iggy::clients::client::IggyClient; |
| use iggy::models::messages::PolledMessage; |
| use iggy_examples::shared::args::Args; |
| use iggy_examples::shared::system; |
| use std::error::Error; |
| use std::sync::Arc; |
| use tracing::info; |
| use tracing_subscriber::layer::SubscriberExt; |
| use tracing_subscriber::util::SubscriberInitExt; |
| use tracing_subscriber::{EnvFilter, Registry}; |
| |
| #[tokio::main] |
| async fn main() -> Result<(), Box<dyn Error>> { |
| let args = Args::parse(); |
| Registry::default() |
| .with(tracing_subscriber::fmt::layer()) |
| .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) |
| .init(); |
| info!( |
| "Basic consumer has started, selected transport: {}", |
| args.transport |
| ); |
| let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?); |
| let client = client_provider::get_raw_client(client_provider_config, false).await?; |
| let client = IggyClient::new(client); |
| client.connect().await?; |
| system::init_by_consumer(&args, &client).await; |
| system::consume_messages(&args, &client, &handle_message).await |
| } |
| |
| fn handle_message(message: &PolledMessage) -> Result<(), Box<dyn Error>> { |
| // The payload can be of any type as it is a raw byte array. In this case it's a simple string. |
| let payload = std::str::from_utf8(&message.payload)?; |
| info!( |
| "Handling message at offset: {}, payload: {}...", |
| message.offset, payload |
| ); |
| Ok(()) |
| } |