blob: 33d6ee8a93f7c92d62895051b5b21cee8e8eac4c [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::shared::stream::PrintEventConsumer;
use iggy::consumer_ext::IggyConsumerMessageExt;
use iggy::prelude::*;
use iggy_examples::shared;
use std::str::FromStr;
use tokio::sync::oneshot;
#[tokio::main]
async fn main() -> Result<(), IggyError> {
println!("Build iggy client and connect it.");
let client = shared::client::build_client("test_stream", "test_topic", true).await?;
println!("Build iggy producer & consumer");
// For customization, use the `new` or `from_stream_topic` constructor
let stream_config = IggyStreamConfig::default();
let (producer, mut consumer) = IggyStream::build(&client, &stream_config).await?;
println!("Start message stream");
let (sender, receiver) = oneshot::channel();
tokio::spawn(async move {
match consumer
// PrintEventConsumer is imported from examples/src/shared/stream.rs
.consume_messages(&PrintEventConsumer {}, receiver)
.await
{
Ok(_) => {}
Err(err) => eprintln!("Failed to consume messages: {err}"),
}
});
println!("Send 3 test messages...");
producer
.send_one(IggyMessage::from_str("Hello World")?)
.await?;
producer
.send_one(IggyMessage::from_str("Hola Iggy")?)
.await?;
producer
.send_one(IggyMessage::from_str("Hi Apache")?)
.await?;
// Wait a bit for all messages to arrive.
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
println!("Stop the message stream and shutdown iggy client");
sender.send(()).expect("Failed to send shutdown signal");
client.delete_stream(stream_config.stream_id()).await?;
client.shutdown().await?;
Ok(())
}