Rename `Message` to `PolledMessage` and `AppendableMessage` respectively
diff --git a/bench/src/producer.rs b/bench/src/producer.rs
index 13f808d..e5b1e14 100644
--- a/bench/src/producer.rs
+++ b/bench/src/producer.rs
@@ -4,7 +4,7 @@
use iggy::clients::client::{IggyClient, IggyClientConfig};
use iggy::error::IggyError;
use iggy::identifier::Identifier;
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use integration::test_server::{login_root, ClientFactory};
use std::str::FromStr;
use std::sync::Arc;
@@ -54,11 +54,11 @@
let payload = Self::create_payload(self.message_size);
let mut messages = Vec::with_capacity(self.messages_per_batch as usize);
for _ in 0..self.messages_per_batch {
- let message = Message::from_str(&payload).unwrap();
+ let message = AppendableMessage::from_str(&payload).unwrap();
messages.push(message);
}
- let mut send_messages = SendMessages {
+ let mut send_messages = AppendMessages {
stream_id: Identifier::numeric(self.stream_id)?,
topic_id: Identifier::numeric(topic_id)?,
partitioning: Partitioning::partition_id(partition_id),
diff --git a/examples/src/basic/consumer/main.rs b/examples/src/basic/consumer/main.rs
index cc0540a..e993fbf 100644
--- a/examples/src/basic/consumer/main.rs
+++ b/examples/src/basic/consumer/main.rs
@@ -1,7 +1,7 @@
use clap::Parser;
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use iggy_examples::shared::args::Args;
use iggy_examples::shared::system;
use std::error::Error;
@@ -24,7 +24,7 @@
system::consume_messages(&args, client, &handle_message).await
}
-fn handle_message(message: &Message) -> Result<(), Box<dyn Error>> {
+fn handle_message(message: &PolledMessage) -> Result<(), Box<dyn Error>> {
// The payload can be of any type as it is a raw byte array. In this case it's a simple string.
let payload = std::str::from_utf8(&message.payload)?;
info!(
diff --git a/examples/src/basic/producer/main.rs b/examples/src/basic/producer/main.rs
index 47f9766..40cd658 100644
--- a/examples/src/basic/producer/main.rs
+++ b/examples/src/basic/producer/main.rs
@@ -3,7 +3,7 @@
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::identifier::Identifier;
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy_examples::shared::args::Args;
use iggy_examples::shared::system;
use std::error::Error;
@@ -46,12 +46,12 @@
for _ in 0..args.messages_per_batch {
current_id += 1;
let payload = format!("message-{current_id}");
- let message = Message::from_str(&payload)?;
+ let message = AppendableMessage::from_str(&payload)?;
messages.push(message);
sent_messages.push(payload);
}
client
- .send_messages(&mut SendMessages {
+ .send_messages(&mut AppendMessages {
stream_id: Identifier::numeric(args.stream_id)?,
topic_id: Identifier::numeric(args.topic_id)?,
partitioning: Partitioning::partition_id(args.partition_id),
diff --git a/examples/src/getting-started/consumer/main.rs b/examples/src/getting-started/consumer/main.rs
index a65fa65..84883ab 100644
--- a/examples/src/getting-started/consumer/main.rs
+++ b/examples/src/getting-started/consumer/main.rs
@@ -3,7 +3,7 @@
use iggy::consumer::Consumer;
use iggy::identifier::Identifier;
use iggy::messages::poll_messages::{PollMessages, PollingStrategy};
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use iggy::tcp::client::TcpClient;
use iggy::tcp::config::TcpClientConfig;
use iggy::users::defaults::*;
@@ -90,7 +90,7 @@
}
}
-fn handle_message(message: &Message) -> Result<(), Box<dyn Error>> {
+fn handle_message(message: &PolledMessage) -> Result<(), Box<dyn Error>> {
// The payload can be of any type as it is a raw byte array. In this case it's a simple string.
let payload = std::str::from_utf8(&message.payload)?;
info!(
diff --git a/examples/src/getting-started/producer/main.rs b/examples/src/getting-started/producer/main.rs
index 64204b4..457a707 100644
--- a/examples/src/getting-started/producer/main.rs
+++ b/examples/src/getting-started/producer/main.rs
@@ -1,7 +1,7 @@
use iggy::client::{Client, StreamClient, TopicClient, UserClient};
use iggy::clients::client::{IggyClient, IggyClientConfig};
use iggy::identifier::Identifier;
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy::streams::create_stream::CreateStream;
use iggy::tcp::client::TcpClient;
use iggy::tcp::config::TcpClientConfig;
@@ -99,11 +99,11 @@
for _ in 0..messages_per_batch {
current_id += 1;
let payload = format!("message-{current_id}");
- let message = Message::from_str(&payload)?;
+ let message = AppendableMessage::from_str(&payload)?;
messages.push(message);
}
client
- .send_messages(&mut SendMessages {
+ .send_messages(&mut AppendMessages {
stream_id: Identifier::numeric(STREAM_ID)?,
topic_id: Identifier::numeric(TOPIC_ID)?,
partitioning: Partitioning::partition_id(PARTITION_ID),
diff --git a/examples/src/message-envelope/consumer/main.rs b/examples/src/message-envelope/consumer/main.rs
index c573339..1c725d5 100644
--- a/examples/src/message-envelope/consumer/main.rs
+++ b/examples/src/message-envelope/consumer/main.rs
@@ -3,7 +3,7 @@
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::{IggyClient, IggyClientConfig, PollMessagesConfig, StoreOffsetKind};
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use iggy_examples::shared::args::Args;
use iggy_examples::shared::messages::*;
use iggy_examples::shared::system;
@@ -35,7 +35,7 @@
system::consume_messages(&args, &client, &handle_message).await
}
-fn handle_message(message: &Message) -> Result<(), Box<dyn Error>> {
+fn handle_message(message: &PolledMessage) -> Result<(), Box<dyn Error>> {
// The payload can be of any type as it is a raw byte array. In this case it's a JSON string.
let json = std::str::from_utf8(&message.payload)?;
// The message envelope can be used to send the different types of messages to the same topic.
diff --git a/examples/src/message-envelope/producer/main.rs b/examples/src/message-envelope/producer/main.rs
index fdc857a..2a398de 100644
--- a/examples/src/message-envelope/producer/main.rs
+++ b/examples/src/message-envelope/producer/main.rs
@@ -5,7 +5,7 @@
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
use iggy::identifier::Identifier;
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy_examples::shared::args::Args;
use iggy_examples::shared::messages_generator::MessagesGenerator;
use iggy_examples::shared::system;
@@ -50,13 +50,13 @@
let serializable_message = message_generator.generate();
// You can send the different message types to the same partition, or stick to the single type.
let json_envelope = serializable_message.to_json_envelope();
- let message = Message::from_str(&json_envelope)?;
+ let message = AppendableMessage::from_str(&json_envelope)?;
messages.push(message);
// This is used for the logging purposes only.
serializable_messages.push(serializable_message);
}
client
- .send_messages(&mut SendMessages {
+ .send_messages(&mut AppendMessages {
stream_id: Identifier::numeric(args.stream_id)?,
topic_id: Identifier::numeric(args.topic_id)?,
partitioning: Partitioning::partition_id(args.partition_id),
diff --git a/examples/src/message-headers/consumer/main.rs b/examples/src/message-headers/consumer/main.rs
index 951f195..cf066f3 100644
--- a/examples/src/message-headers/consumer/main.rs
+++ b/examples/src/message-headers/consumer/main.rs
@@ -4,7 +4,7 @@
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::{IggyClient, IggyClientConfig, PollMessagesConfig, StoreOffsetKind};
use iggy::models::header::HeaderKey;
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use iggy_examples::shared::args::Args;
use iggy_examples::shared::messages::*;
use iggy_examples::shared::system;
@@ -36,7 +36,7 @@
system::consume_messages(&args, &client, &handle_message).await
}
-fn handle_message(message: &Message) -> Result<(), Box<dyn Error>> {
+fn handle_message(message: &PolledMessage) -> Result<(), Box<dyn Error>> {
// The payload can be of any type as it is a raw byte array. In this case it's a JSON string.
let payload = std::str::from_utf8(&message.payload)?;
// The message type is stored in the custom message header.
diff --git a/examples/src/message-headers/producer/main.rs b/examples/src/message-headers/producer/main.rs
index a5d881a..0e2da92 100644
--- a/examples/src/message-headers/producer/main.rs
+++ b/examples/src/message-headers/producer/main.rs
@@ -6,7 +6,7 @@
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
use iggy::identifier::Identifier;
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy::models::header::{HeaderKey, HeaderValue};
use iggy_examples::shared::args::Args;
use iggy_examples::shared::messages_generator::MessagesGenerator;
@@ -62,13 +62,13 @@
HeaderValue::from_str(message_type).unwrap(),
);
- let message = Message::new(None, Bytes::from(json), Some(headers));
+ let message = AppendableMessage::new(None, Bytes::from(json), Some(headers));
messages.push(message);
// This is used for the logging purposes only.
serializable_messages.push(serializable_message);
}
client
- .send_messages(&mut SendMessages {
+ .send_messages(&mut AppendMessages {
stream_id: Identifier::numeric(args.stream_id)?,
topic_id: Identifier::numeric(args.topic_id)?,
partitioning: Partitioning::partition_id(args.partition_id),
diff --git a/examples/src/shared/system.rs b/examples/src/shared/system.rs
index c4e711d..4b92e62 100644
--- a/examples/src/shared/system.rs
+++ b/examples/src/shared/system.rs
@@ -4,7 +4,7 @@
use iggy::error::IggyError;
use iggy::identifier::Identifier;
use iggy::messages::poll_messages::{PollMessages, PollingStrategy};
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use iggy::streams::create_stream::CreateStream;
use iggy::streams::get_stream::GetStream;
use iggy::topics::create_topic::CreateTopic;
@@ -12,7 +12,7 @@
use iggy::users::defaults::*;
use iggy::users::login_user::LoginUser;
use tracing::info;
-type MessageHandler = dyn Fn(&Message) -> Result<(), Box<dyn std::error::Error>>;
+type MessageHandler = dyn Fn(&PolledMessage) -> Result<(), Box<dyn std::error::Error>>;
pub async fn login_root(client: &dyn Client) {
client
diff --git a/integration/tests/cli/message/test_message_poll_command.rs b/integration/tests/cli/message/test_message_poll_command.rs
index 2953d1a..b17eab2 100644
--- a/integration/tests/cli/message/test_message_poll_command.rs
+++ b/integration/tests/cli/message/test_message_poll_command.rs
@@ -4,8 +4,8 @@
};
use assert_cmd::assert::Assert;
use async_trait::async_trait;
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy::messages::poll_messages::{PollingKind, PollingStrategy};
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
use iggy::streams::create_stream::CreateStream;
use iggy::streams::delete_stream::DeleteStream;
use iggy::topics::create_topic::CreateTopic;
@@ -121,11 +121,11 @@
let messages = self
.messages
.iter()
- .filter_map(|s| Message::from_str(s).ok())
+ .filter_map(|s| AppendableMessage::from_str(s).ok())
.collect::<Vec<_>>();
let send_status = client
- .send_messages(&mut SendMessages {
+ .send_messages(&mut AppendMessages {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Identifier::numeric(self.topic_id).unwrap(),
partitioning: Partitioning::partition_id(self.partition_id),
diff --git a/integration/tests/cli/stream/test_stream_purge_command.rs b/integration/tests/cli/stream/test_stream_purge_command.rs
index 41e6f41..97bdd5c 100644
--- a/integration/tests/cli/stream/test_stream_purge_command.rs
+++ b/integration/tests/cli/stream/test_stream_purge_command.rs
@@ -6,7 +6,7 @@
use async_trait::async_trait;
use iggy::client::Client;
use iggy::identifier::Identifier;
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy::streams::create_stream::CreateStream;
use iggy::streams::delete_stream::DeleteStream;
use iggy::streams::get_stream::GetStream;
@@ -68,11 +68,11 @@
let messages = (1..100)
.map(|n| format!("message {}", n))
- .filter_map(|s| Message::from_str(s.as_str()).ok())
+ .filter_map(|s| AppendableMessage::from_str(s.as_str()).ok())
.collect::<Vec<_>>();
let send_status = client
- .send_messages(&mut SendMessages {
+ .send_messages(&mut AppendMessages {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Identifier::numeric(self.topic_id).unwrap(),
partitioning: Partitioning::default(),
diff --git a/integration/tests/cli/topic/test_topic_purge_command.rs b/integration/tests/cli/topic/test_topic_purge_command.rs
index 19f534c..52ed0b8 100644
--- a/integration/tests/cli/topic/test_topic_purge_command.rs
+++ b/integration/tests/cli/topic/test_topic_purge_command.rs
@@ -4,7 +4,7 @@
};
use assert_cmd::assert::Assert;
use async_trait::async_trait;
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy::streams::create_stream::CreateStream;
use iggy::streams::delete_stream::DeleteStream;
use iggy::topics::create_topic::CreateTopic;
@@ -84,11 +84,11 @@
let messages = (1..100)
.map(|n| format!("message {}", n))
- .filter_map(|s| Message::from_str(s.as_str()).ok())
+ .filter_map(|s| AppendableMessage::from_str(s.as_str()).ok())
.collect::<Vec<_>>();
let send_status = client
- .send_messages(&mut SendMessages {
+ .send_messages(&mut AppendMessages {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Identifier::numeric(self.topic_id).unwrap(),
partitioning: Partitioning::default(),
diff --git a/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs b/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
index e1f90fe..571850e 100644
--- a/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
+++ b/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
@@ -5,8 +5,8 @@
use iggy::consumer_groups::get_consumer_group::GetConsumerGroup;
use iggy::consumer_groups::join_consumer_group::JoinConsumerGroup;
use iggy::identifier::Identifier;
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy::messages::poll_messages::{PollMessages, PollingStrategy};
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
use iggy::models::consumer_group::ConsumerGroupDetails;
use iggy::streams::create_stream::CreateStream;
use iggy::streams::delete_stream::DeleteStream;
@@ -133,9 +133,9 @@
) {
// 1. Send messages to the calculated partition ID on the server side by using entity ID as a key
for entity_id in 1..=MESSAGES_COUNT {
- let message = Message::from_str(&get_message_payload(entity_id)).unwrap();
+ let message = AppendableMessage::from_str(&get_message_payload(entity_id)).unwrap();
let messages = vec![message];
- let mut send_messages = SendMessages {
+ let mut send_messages = AppendMessages {
stream_id: Identifier::numeric(STREAM_ID).unwrap(),
topic_id: Identifier::numeric(TOPIC_ID).unwrap(),
partitioning: Partitioning::messages_key_u32(entity_id),
@@ -194,9 +194,10 @@
}
let message =
- Message::from_str(&get_extended_message_payload(partition_id, entity_id)).unwrap();
+ AppendableMessage::from_str(&get_extended_message_payload(partition_id, entity_id))
+ .unwrap();
let messages = vec![message];
- let mut send_messages = SendMessages {
+ let mut send_messages = AppendMessages {
stream_id: Identifier::numeric(STREAM_ID).unwrap(),
topic_id: Identifier::numeric(TOPIC_ID).unwrap(),
partitioning: Partitioning::balanced(),
diff --git a/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs b/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
index 247bf30..2423fab 100644
--- a/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
+++ b/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
@@ -5,8 +5,8 @@
use iggy::consumer_groups::get_consumer_group::GetConsumerGroup;
use iggy::consumer_groups::join_consumer_group::JoinConsumerGroup;
use iggy::identifier::Identifier;
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy::messages::poll_messages::{PollMessages, PollingStrategy};
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
use iggy::streams::create_stream::CreateStream;
use iggy::streams::delete_stream::DeleteStream;
use iggy::system::get_me::GetMe;
@@ -104,9 +104,9 @@
async fn execute_using_messages_key_key(client: &IggyClient) {
// 1. Send messages to the calculated partition ID on the server side by using entity ID as a key
for entity_id in 1..=MESSAGES_COUNT {
- let message = Message::from_str(&get_message_payload(entity_id)).unwrap();
+ let message = AppendableMessage::from_str(&get_message_payload(entity_id)).unwrap();
let messages = vec![message];
- let mut send_messages = SendMessages {
+ let mut send_messages = AppendMessages {
stream_id: Identifier::numeric(STREAM_ID).unwrap(),
topic_id: Identifier::numeric(TOPIC_ID).unwrap(),
partitioning: Partitioning::messages_key_u32(entity_id),
@@ -148,9 +148,10 @@
}
let message =
- Message::from_str(&get_extended_message_payload(partition_id, entity_id)).unwrap();
+ AppendableMessage::from_str(&get_extended_message_payload(partition_id, entity_id))
+ .unwrap();
let messages = vec![message];
- let mut send_messages = SendMessages {
+ let mut send_messages = AppendMessages {
stream_id: Identifier::numeric(STREAM_ID).unwrap(),
topic_id: Identifier::numeric(TOPIC_ID).unwrap(),
partitioning: Partitioning::balanced(),
diff --git a/integration/tests/server/scenarios/message_headers_scenario.rs b/integration/tests/server/scenarios/message_headers_scenario.rs
index 306ef1c..213e5e4 100644
--- a/integration/tests/server/scenarios/message_headers_scenario.rs
+++ b/integration/tests/server/scenarios/message_headers_scenario.rs
@@ -3,8 +3,8 @@
use iggy::clients::client::{IggyClient, IggyClientConfig};
use iggy::consumer::Consumer;
use iggy::identifier::Identifier;
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy::messages::poll_messages::{PollMessages, PollingStrategy};
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
use iggy::models::header::{HeaderKey, HeaderValue};
use iggy::streams::create_stream::CreateStream;
use iggy::streams::delete_stream::DeleteStream;
@@ -34,7 +34,7 @@
let id = (offset + 1) as u128;
let payload = get_message_payload(offset as u64);
let headers = get_message_headers();
- messages.push(Message {
+ messages.push(AppendableMessage {
id,
length: payload.len() as u32,
payload,
@@ -42,7 +42,7 @@
});
}
- let mut send_messages = SendMessages {
+ let mut send_messages = AppendMessages {
stream_id: Identifier::numeric(STREAM_ID).unwrap(),
topic_id: Identifier::numeric(TOPIC_ID).unwrap(),
partitioning: Partitioning::partition_id(PARTITION_ID),
diff --git a/integration/tests/server/scenarios/system_scenario.rs b/integration/tests/server/scenarios/system_scenario.rs
index 82f1cc2..87e76ab 100644
--- a/integration/tests/server/scenarios/system_scenario.rs
+++ b/integration/tests/server/scenarios/system_scenario.rs
@@ -15,8 +15,8 @@
use iggy::consumer_offsets::store_consumer_offset::StoreConsumerOffset;
use iggy::error::IggyError;
use iggy::identifier::Identifier;
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy::messages::poll_messages::{PollMessages, PollingStrategy};
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
use iggy::partitions::create_partitions::CreatePartitions;
use iggy::partitions::delete_partitions::DeletePartitions;
use iggy::streams::create_stream::CreateStream;
@@ -222,7 +222,7 @@
// 17. Send messages to the specific topic and partition
let messages = create_messages();
- let mut send_messages = SendMessages {
+ let mut send_messages = AppendMessages {
stream_id: Identifier::numeric(STREAM_ID).unwrap(),
topic_id: Identifier::numeric(TOPIC_ID).unwrap(),
partitioning: Partitioning::partition_id(PARTITION_ID),
@@ -649,7 +649,7 @@
// 39. Purge the existing stream and ensure it has no messages
let messages = create_messages();
- let mut send_messages = SendMessages {
+ let mut send_messages = AppendMessages {
stream_id: Identifier::numeric(STREAM_ID).unwrap(),
topic_id: Identifier::numeric(TOPIC_ID).unwrap(),
partitioning: Partitioning::partition_id(PARTITION_ID),
@@ -753,19 +753,19 @@
assert_clean_system(&client).await;
}
-fn assert_message(message: &iggy::models::messages::Message, offset: u64) {
+fn assert_message(message: &iggy::models::polled_messages::PolledMessage, offset: u64) {
let expected_payload = get_message_payload(offset);
assert!(message.timestamp > 0);
assert_eq!(message.offset, offset);
assert_eq!(message.payload, expected_payload);
}
-fn create_messages() -> Vec<Message> {
+fn create_messages() -> Vec<AppendableMessage> {
let mut messages = Vec::new();
for offset in 0..MESSAGES_COUNT {
let id = (offset + 1) as u128;
let payload = get_message_payload(offset as u64);
- messages.push(Message {
+ messages.push(AppendableMessage {
id,
length: payload.len() as u32,
payload,
diff --git a/integration/tests/streaming/messages.rs b/integration/tests/streaming/messages.rs
index 81ac74e..7fcf740 100644
--- a/integration/tests/streaming/messages.rs
+++ b/integration/tests/streaming/messages.rs
@@ -1,7 +1,7 @@
use crate::streaming::common::test_setup::TestSetup;
use bytes::Bytes;
use iggy::models::header::{HeaderKey, HeaderValue};
-use iggy::models::messages::{Message, MessageState};
+use iggy::models::polled_messages::{MessageState, PolledMessage};
use iggy::utils::{checksum, timestamp::IggyTimestamp};
use server::configs::system::{PartitionConfig, SystemConfig};
use server::streaming::partitions::partition::Partition;
@@ -56,7 +56,7 @@
HeaderKey::new("key-3").unwrap(),
HeaderValue::from_uint64(123456).unwrap(),
);
- let appended_message = Message::create(
+ let appended_message = PolledMessage::create(
offset,
state,
timestamp,
@@ -65,7 +65,7 @@
checksum,
Some(headers.clone()),
);
- let message = Message::create(
+ let message = PolledMessage::create(
offset,
state,
timestamp,
diff --git a/integration/tests/streaming/mod.rs b/integration/tests/streaming/mod.rs
index b52769a..1b096ce 100644
--- a/integration/tests/streaming/mod.rs
+++ b/integration/tests/streaming/mod.rs
@@ -1,5 +1,5 @@
use bytes::Bytes;
-use iggy::models::messages::{Message, MessageState};
+use iggy::models::polled_messages::{MessageState, PolledMessage};
use iggy::utils::checksum;
mod common;
@@ -15,7 +15,7 @@
mod topic_messages;
mod user;
-fn create_messages() -> Vec<Message> {
+fn create_messages() -> Vec<PolledMessage> {
vec![
create_message(0, 1, "message 1"),
create_message(1, 2, "message 2"),
@@ -26,10 +26,10 @@
]
}
-fn create_message(offset: u64, id: u128, payload: &str) -> Message {
+fn create_message(offset: u64, id: u128, payload: &str) -> PolledMessage {
let payload = Bytes::from(payload.to_string());
let checksum = checksum::calculate(payload.as_ref());
- Message::create(
+ PolledMessage::create(
offset,
MessageState::Available,
1,
diff --git a/integration/tests/streaming/segment.rs b/integration/tests/streaming/segment.rs
index fb9c503..3eef9f2 100644
--- a/integration/tests/streaming/segment.rs
+++ b/integration/tests/streaming/segment.rs
@@ -1,6 +1,6 @@
use crate::streaming::common::test_setup::TestSetup;
use bytes::Bytes;
-use iggy::models::messages::{Message, MessageState};
+use iggy::models::polled_messages::{MessageState, PolledMessage};
use iggy::utils::{checksum, timestamp::IggyTimestamp};
use server::streaming::segments::segment;
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION, TIME_INDEX_EXTENSION};
@@ -254,10 +254,10 @@
assert!(fs::metadata(&time_index_path).await.is_ok());
}
-fn create_message(offset: u64, payload: &str, timestamp: u64) -> Message {
+fn create_message(offset: u64, payload: &str, timestamp: u64) -> PolledMessage {
let payload = Bytes::from(payload.to_string());
let checksum = checksum::calculate(payload.as_ref());
- Message::create(
+ PolledMessage::create(
offset,
MessageState::Available,
timestamp,
diff --git a/integration/tests/streaming/stream.rs b/integration/tests/streaming/stream.rs
index e69cbe9..91c283d 100644
--- a/integration/tests/streaming/stream.rs
+++ b/integration/tests/streaming/stream.rs
@@ -1,8 +1,8 @@
use crate::streaming::common::test_setup::TestSetup;
use crate::streaming::create_messages;
use iggy::identifier::Identifier;
+use iggy::messages::append_messages::Partitioning;
use iggy::messages::poll_messages::PollingStrategy;
-use iggy::messages::send_messages::Partitioning;
use server::streaming::polling_consumer::PollingConsumer;
use server::streaming::streams::stream::Stream;
use tokio::fs;
diff --git a/integration/tests/streaming/topic.rs b/integration/tests/streaming/topic.rs
index 46aaaca..d480a3d 100644
--- a/integration/tests/streaming/topic.rs
+++ b/integration/tests/streaming/topic.rs
@@ -1,7 +1,7 @@
use crate::streaming::common::test_setup::TestSetup;
use crate::streaming::create_messages;
+use iggy::messages::append_messages::Partitioning;
use iggy::messages::poll_messages::PollingStrategy;
-use iggy::messages::send_messages::Partitioning;
use server::streaming::polling_consumer::PollingConsumer;
use server::streaming::topics::topic::Topic;
use tokio::fs;
diff --git a/integration/tests/streaming/topic_messages.rs b/integration/tests/streaming/topic_messages.rs
index 37b5813..d6cdbe1 100644
--- a/integration/tests/streaming/topic_messages.rs
+++ b/integration/tests/streaming/topic_messages.rs
@@ -1,8 +1,8 @@
use crate::streaming::common::test_setup::TestSetup;
+use iggy::messages::append_messages;
+use iggy::messages::append_messages::Partitioning;
use iggy::messages::poll_messages::PollingStrategy;
-use iggy::messages::send_messages;
-use iggy::messages::send_messages::Partitioning;
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use iggy::utils::byte_size::IggyByteSize;
use server::configs::resource_quota::MemoryResourceQuota;
use server::configs::system::{CacheConfig, SystemConfig};
@@ -212,8 +212,8 @@
topic
}
-fn get_message(payload: &str) -> Message {
- Message::from_message(&send_messages::Message::from_str(payload).unwrap())
+fn get_message(payload: &str) -> PolledMessage {
+ PolledMessage::from_message(&append_messages::AppendableMessage::from_str(payload).unwrap())
}
fn create_payload(size: u32) -> String {
diff --git a/sdk/src/binary/mapper.rs b/sdk/src/binary/mapper.rs
index d0d1ab7..a9d6079 100644
--- a/sdk/src/binary/mapper.rs
+++ b/sdk/src/binary/mapper.rs
@@ -4,10 +4,10 @@
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails, ConsumerGroupMember};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::identity_info::IdentityInfo;
-use crate::models::messages::{Message, MessageState, PolledMessages};
use crate::models::partition::Partition;
use crate::models::permissions::Permissions;
use crate::models::personal_access_token::{PersonalAccessTokenInfo, RawPersonalAccessToken};
+use crate::models::polled_messages::{MessageState, PolledMessage, PolledMessages};
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
use crate::models::topic::{Topic, TopicDetails};
@@ -18,7 +18,7 @@
use std::collections::HashMap;
use std::str::from_utf8;
-const EMPTY_MESSAGES: Vec<Message> = vec![];
+const EMPTY_MESSAGES: Vec<PolledMessage> = vec![];
const EMPTY_TOPICS: Vec<Topic> = vec![];
const EMPTY_STREAMS: Vec<Stream> = vec![];
const EMPTY_CLIENTS: Vec<ClientInfo> = vec![];
@@ -264,7 +264,7 @@
let payload = payload[payload_range].to_vec();
let total_size = 45 + message_length as usize;
position += total_size;
- messages.push(Message {
+ messages.push(PolledMessage {
offset,
timestamp,
state,
diff --git a/sdk/src/binary/messages.rs b/sdk/src/binary/messages.rs
index 0f95ad7..b22e843 100644
--- a/sdk/src/binary/messages.rs
+++ b/sdk/src/binary/messages.rs
@@ -4,9 +4,9 @@
use crate::client::MessageClient;
use crate::command::{POLL_MESSAGES_CODE, SEND_MESSAGES_CODE};
use crate::error::IggyError;
+use crate::messages::append_messages::AppendMessages;
use crate::messages::poll_messages::PollMessages;
-use crate::messages::send_messages::SendMessages;
-use crate::models::messages::PolledMessages;
+use crate::models::polled_messages::PolledMessages;
#[async_trait::async_trait]
impl<B: BinaryClient> MessageClient for B {
@@ -18,7 +18,7 @@
mapper::map_polled_messages(&response)
}
- async fn send_messages(&self, command: &mut SendMessages) -> Result<(), IggyError> {
+ async fn send_messages(&self, command: &mut AppendMessages) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(SEND_MESSAGES_CODE, &command.as_bytes())
.await?;
diff --git a/sdk/src/cli/message/send_messages.rs b/sdk/src/cli/message/send_messages.rs
index 4e3b94c..a8155dc 100644
--- a/sdk/src/cli/message/send_messages.rs
+++ b/sdk/src/cli/message/send_messages.rs
@@ -1,7 +1,7 @@
use crate::cli_command::{CliCommand, PRINT_TARGET};
use crate::client::Client;
use crate::identifier::Identifier;
-use crate::messages::send_messages::{Message, Partitioning, SendMessages};
+use crate::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use anyhow::Context;
use async_trait::async_trait;
use std::io::{self, Read};
@@ -65,20 +65,20 @@
let messages = match &self.messages {
Some(messages) => messages
.iter()
- .map(|s| Message::new(None, s.clone().into(), None))
+ .map(|s| AppendableMessage::new(None, s.clone().into(), None))
.collect::<Vec<_>>(),
None => {
let input = self.read_message_from_stdin()?;
input
.lines()
- .map(|m| Message::new(None, String::from(m).into(), None))
+ .map(|m| AppendableMessage::new(None, String::from(m).into(), None))
.collect()
}
};
client
- .send_messages(&mut SendMessages {
+ .send_messages(&mut AppendMessages {
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
partitioning: self.partitioning.clone(),
diff --git a/sdk/src/client.rs b/sdk/src/client.rs
index 737ef5d..9e512f7 100644
--- a/sdk/src/client.rs
+++ b/sdk/src/client.rs
@@ -7,14 +7,14 @@
use crate::consumer_offsets::get_consumer_offset::GetConsumerOffset;
use crate::consumer_offsets::store_consumer_offset::StoreConsumerOffset;
use crate::error::IggyError;
+use crate::messages::append_messages::AppendMessages;
use crate::messages::poll_messages::PollMessages;
-use crate::messages::send_messages::SendMessages;
use crate::models::client_info::{ClientInfo, ClientInfoDetails};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::identity_info::IdentityInfo;
-use crate::models::messages::PolledMessages;
use crate::models::personal_access_token::{PersonalAccessTokenInfo, RawPersonalAccessToken};
+use crate::models::polled_messages::PolledMessages;
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
use crate::models::topic::{Topic, TopicDetails};
@@ -250,7 +250,7 @@
/// Send messages using specified partitioning strategy to the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to send the messages.
- async fn send_messages(&self, command: &mut SendMessages) -> Result<(), IggyError>;
+ async fn send_messages(&self, command: &mut AppendMessages) -> Result<(), IggyError>;
}
/// This trait defines the methods to interact with the consumer offset module.
diff --git a/sdk/src/clients/client.rs b/sdk/src/clients/client.rs
index a0df6c7..35f2b70 100644
--- a/sdk/src/clients/client.rs
+++ b/sdk/src/clients/client.rs
@@ -14,14 +14,14 @@
use crate::error::IggyError;
use crate::identifier::Identifier;
use crate::message_handler::MessageHandler;
+use crate::messages::append_messages::{AppendMessages, Partitioning, PartitioningKind};
use crate::messages::poll_messages::{PollMessages, PollingKind};
-use crate::messages::send_messages::{Partitioning, PartitioningKind, SendMessages};
use crate::models::client_info::{ClientInfo, ClientInfoDetails};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::identity_info::IdentityInfo;
-use crate::models::messages::{Message, PolledMessages};
use crate::models::personal_access_token::{PersonalAccessTokenInfo, RawPersonalAccessToken};
+use crate::models::polled_messages::{PolledMessage, PolledMessages};
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
use crate::models::topic::{Topic, TopicDetails};
@@ -84,7 +84,7 @@
partitioner: Option<Box<dyn Partitioner>>,
encryptor: Option<Box<dyn Encryptor>>,
message_handler: Option<Arc<Box<dyn MessageHandler>>>,
- message_channel_sender: Option<Arc<Sender<Message>>>,
+ message_channel_sender: Option<Arc<Sender<PolledMessage>>>,
}
/// The builder for the `IggyClient` instance, which allows to configure and provide custom implementations for the partitioner, encryptor or message handler.
@@ -134,7 +134,7 @@
#[derive(Debug)]
struct SendMessagesBatch {
- pub commands: VecDeque<SendMessages>,
+ pub commands: VecDeque<AppendMessages>,
}
/// The optional configuration for the `IggyClient` instance, consisting of the optional configuration for sending and polling the messages in the background.
@@ -265,7 +265,7 @@
}
/// Returns the channel receiver for the messages which are polled in the background. This will only work if the `start_polling_messages` method is called.
- pub fn subscribe_to_polled_messages(&mut self) -> Receiver<Message> {
+ pub fn subscribe_to_polled_messages(&mut self) -> Receiver<PolledMessage> {
let (sender, receiver) = flume::unbounded();
self.message_channel_sender = Some(Arc::new(sender));
receiver
@@ -279,7 +279,7 @@
config_override: Option<PollMessagesConfig>,
) -> JoinHandle<()>
where
- F: Fn(Message) + Send + Sync + 'static,
+ F: Fn(PolledMessage) + Send + Sync + 'static,
{
let client = self.client.clone();
let mut interval = Duration::from_millis(100);
@@ -363,7 +363,7 @@
/// Sends the provided messages in the background using the custom partitioner implementation.
pub async fn send_messages_using_partitioner(
&self,
- command: &mut SendMessages,
+ command: &mut AppendMessages,
partitioner: &dyn Partitioner,
) -> Result<(), IggyError> {
let partition_id = partitioner.calculate_partition_id(
@@ -462,7 +462,7 @@
}
while let Some(messages) = batches.pop_front() {
- let mut send_messages = SendMessages {
+ let mut send_messages = AppendMessages {
stream_id: Identifier::from_identifier(&stream_id),
topic_id: Identifier::from_identifier(&topic_id),
partitioning: Partitioning {
@@ -690,7 +690,7 @@
Ok(polled_messages)
}
- async fn send_messages(&self, command: &mut SendMessages) -> Result<(), IggyError> {
+ async fn send_messages(&self, command: &mut AppendMessages) -> Result<(), IggyError> {
if command.messages.is_empty() {
return Ok(());
}
@@ -724,7 +724,7 @@
let mut messages = Vec::with_capacity(command.messages.len());
for message in &command.messages {
- let message = crate::messages::send_messages::Message {
+ let message = crate::messages::append_messages::AppendableMessage {
id: message.id,
length: message.length,
payload: message.payload.clone(),
@@ -732,7 +732,7 @@
};
messages.push(message);
}
- let send_messages = SendMessages {
+ let send_messages = AppendMessages {
stream_id: Identifier::from_identifier(&command.stream_id),
topic_id: Identifier::from_identifier(&command.topic_id),
partitioning: Partitioning::from_partitioning(&command.partitioning),
diff --git a/sdk/src/command.rs b/sdk/src/command.rs
index 6c85c49..29963ed 100644
--- a/sdk/src/command.rs
+++ b/sdk/src/command.rs
@@ -8,8 +8,8 @@
use crate::consumer_offsets::get_consumer_offset::GetConsumerOffset;
use crate::consumer_offsets::store_consumer_offset::StoreConsumerOffset;
use crate::error::IggyError;
+use crate::messages::append_messages::AppendMessages;
use crate::messages::poll_messages::PollMessages;
-use crate::messages::send_messages::SendMessages;
use crate::partitions::create_partitions::CreatePartitions;
use crate::partitions::delete_partitions::DeletePartitions;
use crate::personal_access_tokens::create_personal_access_token::CreatePersonalAccessToken;
@@ -150,7 +150,7 @@
CreatePersonalAccessToken(CreatePersonalAccessToken),
DeletePersonalAccessToken(DeletePersonalAccessToken),
LoginWithPersonalAccessToken(LoginWithPersonalAccessToken),
- SendMessages(SendMessages),
+ SendMessages(AppendMessages),
PollMessages(PollMessages),
GetConsumerOffset(GetConsumerOffset),
StoreConsumerOffset(StoreConsumerOffset),
@@ -291,7 +291,7 @@
LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => Ok(Command::LoginWithPersonalAccessToken(
LoginWithPersonalAccessToken::from_bytes(payload)?,
)),
- SEND_MESSAGES_CODE => Ok(Command::SendMessages(SendMessages::from_bytes(payload)?)),
+ SEND_MESSAGES_CODE => Ok(Command::SendMessages(AppendMessages::from_bytes(payload)?)),
POLL_MESSAGES_CODE => Ok(Command::PollMessages(PollMessages::from_bytes(payload)?)),
STORE_CONSUMER_OFFSET_CODE => Ok(Command::StoreConsumerOffset(
StoreConsumerOffset::from_bytes(payload)?,
@@ -525,9 +525,9 @@
&LoginWithPersonalAccessToken::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
- &Command::SendMessages(SendMessages::default()),
+ &Command::SendMessages(AppendMessages::default()),
SEND_MESSAGES_CODE,
- &SendMessages::default(),
+ &AppendMessages::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&Command::PollMessages(PollMessages::default()),
diff --git a/sdk/src/http/messages.rs b/sdk/src/http/messages.rs
index ee4a5ab..65e2130 100644
--- a/sdk/src/http/messages.rs
+++ b/sdk/src/http/messages.rs
@@ -1,9 +1,9 @@
use crate::client::MessageClient;
use crate::error::IggyError;
use crate::http::client::HttpClient;
+use crate::messages::append_messages::AppendMessages;
use crate::messages::poll_messages::PollMessages;
-use crate::messages::send_messages::SendMessages;
-use crate::models::messages::PolledMessages;
+use crate::models::polled_messages::PolledMessages;
use async_trait::async_trait;
#[async_trait]
@@ -22,7 +22,7 @@
Ok(messages)
}
- async fn send_messages(&self, command: &mut SendMessages) -> Result<(), IggyError> {
+ async fn send_messages(&self, command: &mut AppendMessages) -> Result<(), IggyError> {
self.post(
&get_path(
&command.stream_id.as_string(),
diff --git a/sdk/src/message_handler.rs b/sdk/src/message_handler.rs
index 3b5295d..84e54ac 100644
--- a/sdk/src/message_handler.rs
+++ b/sdk/src/message_handler.rs
@@ -1,7 +1,7 @@
-use crate::models::messages::Message;
+use crate::models::polled_messages::PolledMessage;
use std::fmt::Debug;
/// The trait represent the logic responsible for handling the message and is used by the `IggyClient`.
pub trait MessageHandler: Send + Sync + Debug {
- fn handle(&self, message: Message);
+ fn handle(&self, message: PolledMessage);
}
diff --git a/sdk/src/messages/send_messages.rs b/sdk/src/messages/append_messages.rs
similarity index 92%
rename from sdk/src/messages/send_messages.rs
rename to sdk/src/messages/append_messages.rs
index 3349411..96d0698 100644
--- a/sdk/src/messages/send_messages.rs
+++ b/sdk/src/messages/append_messages.rs
@@ -23,7 +23,7 @@
/// - `partitioning` - to which partition the messages should be sent - either provided by the client or calculated by the server.
/// - `messages` - collection of messages to be sent.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
-pub struct SendMessages {
+pub struct AppendMessages {
/// Unique stream ID (numeric or name).
#[serde(skip)]
pub stream_id: Identifier,
@@ -33,7 +33,7 @@
/// To which partition the messages should be sent - either provided by the client or calculated by the server.
pub partitioning: Partitioning,
/// Collection of messages to be sent.
- pub messages: Vec<Message>,
+ pub messages: Vec<AppendableMessage>,
}
/// `Partitioning` is used to specify to which partition the messages should be sent.
@@ -54,14 +54,14 @@
pub value: Vec<u8>,
}
-/// The single message to be sent. It has the following payload:
+/// The single message to be sent (appended). It has the following payload:
/// - `id` - unique message ID, if not specified by the client (has value = 0), it will be generated by the server.
/// - `length` - length of the payload.
/// - `payload` - binary message payload.
/// - `headers` - optional collection of headers.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, PartialEq)]
-pub struct Message {
+pub struct AppendableMessage {
/// Unique message ID, if not specified by the client (has value = 0), it will be generated by the server.
#[serde(default = "default_message_id")]
pub id: u128,
@@ -92,13 +92,13 @@
0
}
-impl Default for SendMessages {
+impl Default for AppendMessages {
fn default() -> Self {
- SendMessages {
+ AppendMessages {
stream_id: Identifier::default(),
topic_id: Identifier::default(),
partitioning: Partitioning::default(),
- messages: vec![Message::default()],
+ messages: vec![AppendableMessage::default()],
}
}
}
@@ -190,9 +190,9 @@
}
}
-impl CommandPayload for SendMessages {}
+impl CommandPayload for AppendMessages {}
-impl Validatable<IggyError> for SendMessages {
+impl Validatable<IggyError> for AppendMessages {
fn validate(&self) -> Result<(), IggyError> {
if self.messages.is_empty() {
return Err(IggyError::InvalidMessagesCount);
@@ -251,14 +251,14 @@
}
}
-impl Message {
+impl AppendableMessage {
/// Create a new message with the optional ID, payload and headers.
pub fn new(
id: Option<u128>,
payload: Bytes,
headers: Option<HashMap<HeaderKey, HeaderValue>>,
) -> Self {
- Message {
+ AppendableMessage {
id: id.unwrap_or(0),
#[allow(clippy::cast_possible_truncation)]
length: payload.len() as u32,
@@ -274,10 +274,10 @@
}
}
-impl Default for Message {
+impl Default for AppendableMessage {
fn default() -> Self {
let payload = Bytes::from("hello world");
- Message {
+ AppendableMessage {
id: 0,
length: payload.len() as u32,
payload,
@@ -286,7 +286,7 @@
}
}
-impl Display for Message {
+impl Display for AppendableMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}|{}", self.id, String::from_utf8_lossy(&self.payload))
}
@@ -324,7 +324,7 @@
}
}
-impl BytesSerializable for Message {
+impl BytesSerializable for AppendableMessage {
fn as_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(self.get_size_bytes() as usize);
bytes.put_u128_le(self.id);
@@ -371,7 +371,7 @@
return Err(IggyError::InvalidMessagePayloadLength);
}
- Ok(Message {
+ Ok(AppendableMessage {
id,
length: payload_length,
payload,
@@ -380,7 +380,7 @@
}
}
-impl FromStr for Message {
+impl FromStr for AppendableMessage {
type Err = IggyError;
fn from_str(input: &str) -> Result<Self, Self::Err> {
let id = default_message_id();
@@ -390,7 +390,7 @@
return Err(IggyError::EmptyMessagePayload);
}
- Ok(Message {
+ Ok(AppendableMessage {
id,
length,
payload,
@@ -399,12 +399,12 @@
}
}
-impl BytesSerializable for SendMessages {
+impl BytesSerializable for AppendMessages {
fn as_bytes(&self) -> Vec<u8> {
let messages_size = self
.messages
.iter()
- .map(Message::get_size_bytes)
+ .map(AppendableMessage::get_size_bytes)
.sum::<u32>();
let key_bytes = self.partitioning.as_bytes();
@@ -423,7 +423,7 @@
bytes
}
- fn from_bytes(bytes: &[u8]) -> Result<SendMessages, IggyError> {
+ fn from_bytes(bytes: &[u8]) -> Result<AppendMessages, IggyError> {
if bytes.len() < 11 {
return Err(IggyError::InvalidCommand);
}
@@ -439,12 +439,12 @@
position = 0;
let mut messages = Vec::new();
while position < messages_payloads.len() {
- let message = Message::from_bytes(&messages_payloads[position..])?;
+ let message = AppendableMessage::from_bytes(&messages_payloads[position..])?;
position += message.get_size_bytes() as usize;
messages.push(message);
}
- let command = SendMessages {
+ let command = AppendMessages {
stream_id,
topic_id,
partitioning: key,
@@ -455,7 +455,7 @@
}
}
-impl Display for SendMessages {
+impl Display for AppendMessages {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
@@ -505,11 +505,11 @@
#[test]
fn should_be_serialized_as_bytes() {
- let message_1 = Message::from_str("hello 1").unwrap();
- let message_2 = Message::new(Some(2), "hello 2".into(), None);
- let message_3 = Message::new(Some(3), "hello 3".into(), None);
+ let message_1 = AppendableMessage::from_str("hello 1").unwrap();
+ let message_2 = AppendableMessage::new(Some(2), "hello 2".into(), None);
+ let message_3 = AppendableMessage::new(Some(3), "hello 3".into(), None);
let messages = vec![message_1, message_2, message_3];
- let command = SendMessages {
+ let command = AppendMessages {
stream_id: Identifier::numeric(1).unwrap(),
topic_id: Identifier::numeric(2).unwrap(),
partitioning: Partitioning::partition_id(4),
@@ -546,9 +546,9 @@
let topic_id = Identifier::numeric(2).unwrap();
let key = Partitioning::partition_id(4);
- let message_1 = Message::from_str("hello 1").unwrap();
- let message_2 = Message::new(Some(2), "hello 2".into(), None);
- let message_3 = Message::new(Some(3), "hello 3".into(), None);
+ let message_1 = AppendableMessage::from_str("hello 1").unwrap();
+ let message_2 = AppendableMessage::new(Some(2), "hello 2".into(), None);
+ let message_3 = AppendableMessage::new(Some(3), "hello 3".into(), None);
let messages = [
message_1.as_bytes(),
message_2.as_bytes(),
@@ -566,14 +566,14 @@
bytes.extend(key_bytes);
bytes.extend(messages);
- let command = SendMessages::from_bytes(&bytes);
+ let command = AppendMessages::from_bytes(&bytes);
assert!(command.is_ok());
let messages_payloads = &bytes[current_position..];
let mut position = 0;
let mut messages = Vec::new();
while position < messages_payloads.len() {
- let message = Message::from_bytes(&messages_payloads[position..]).unwrap();
+ let message = AppendableMessage::from_bytes(&messages_payloads[position..]).unwrap();
position += message.get_size_bytes() as usize;
messages.push(message);
}
diff --git a/sdk/src/messages/mod.rs b/sdk/src/messages/mod.rs
index d559e03..d9f8486 100644
--- a/sdk/src/messages/mod.rs
+++ b/sdk/src/messages/mod.rs
@@ -1,5 +1,5 @@
+pub mod append_messages;
pub mod poll_messages;
-pub mod send_messages;
const MAX_HEADERS_SIZE: u32 = 100 * 1000;
pub const MAX_PAYLOAD_SIZE: u32 = 10 * 1000 * 1000;
diff --git a/sdk/src/models/mod.rs b/sdk/src/models/mod.rs
index c147ac5..7e1a44d 100644
--- a/sdk/src/models/mod.rs
+++ b/sdk/src/models/mod.rs
@@ -3,10 +3,10 @@
pub mod consumer_offset_info;
pub mod header;
pub mod identity_info;
-pub mod messages;
pub mod partition;
pub mod permissions;
pub mod personal_access_token;
+pub mod polled_messages;
pub mod stats;
pub mod stream;
pub mod topic;
diff --git a/sdk/src/models/messages.rs b/sdk/src/models/polled_messages.rs
similarity index 94%
rename from sdk/src/models/messages.rs
rename to sdk/src/models/polled_messages.rs
index a512b94..91c6a56 100644
--- a/sdk/src/models/messages.rs
+++ b/sdk/src/models/polled_messages.rs
@@ -1,6 +1,6 @@
use crate::bytes_serializable::BytesSerializable;
use crate::error::IggyError;
-use crate::messages::send_messages;
+use crate::messages::append_messages;
use crate::models::header;
use crate::models::header::{HeaderKey, HeaderValue};
use crate::sizeable::Sizeable;
@@ -26,7 +26,7 @@
/// The current offset of the partition.
pub current_offset: u64,
/// The collection of messages.
- pub messages: Vec<Message>,
+ pub messages: Vec<PolledMessage>,
}
/// The single message that is polled from the partition.
@@ -41,7 +41,7 @@
/// - `payload`: the binary payload of the message.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
-pub struct Message {
+pub struct PolledMessage {
/// The offset of the message.
pub offset: u64,
/// The state of the message.
@@ -123,15 +123,15 @@
}
}
-impl Sizeable for Arc<Message> {
+impl Sizeable for Arc<PolledMessage> {
fn get_size_bytes(&self) -> u32 {
self.as_ref().get_size_bytes()
}
}
-impl Message {
+impl PolledMessage {
/// Creates a new message from the `Message` struct being part of `SendMessages` command.
- pub fn from_message(message: &send_messages::Message) -> Self {
+ pub fn from_message(message: &append_messages::AppendableMessage) -> Self {
let timestamp = IggyTimestamp::now().to_micros();
let checksum = checksum::calculate(&message.payload);
let headers = message.headers.as_ref().cloned();
@@ -155,7 +155,7 @@
checksum: u32,
headers: Option<HashMap<HeaderKey, HeaderValue>>,
) -> Self {
- Message::create(0, state, timestamp, id, payload, checksum, headers)
+ PolledMessage::create(0, state, timestamp, id, payload, checksum, headers)
}
/// Creates a new message with a specified offset.
@@ -168,7 +168,7 @@
checksum: u32,
headers: Option<HashMap<HeaderKey, HeaderValue>>,
) -> Self {
- Message {
+ PolledMessage {
offset,
state,
timestamp,
diff --git a/sdk/src/partitioner.rs b/sdk/src/partitioner.rs
index eac0dbb..a29b154 100644
--- a/sdk/src/partitioner.rs
+++ b/sdk/src/partitioner.rs
@@ -1,6 +1,6 @@
use crate::error::IggyError;
use crate::identifier::Identifier;
-use crate::messages::send_messages::{Message, Partitioning};
+use crate::messages::append_messages::{AppendableMessage, Partitioning};
use std::fmt::Debug;
/// The trait represent the logic responsible for calculating the partition ID and is used by the `IggyClient`.
@@ -11,6 +11,6 @@
stream_id: &Identifier,
topic_id: &Identifier,
partitioning: &Partitioning,
- messages: &[Message],
+ messages: &[AppendableMessage],
) -> Result<u32, IggyError>;
}
diff --git a/server/src/binary/handlers/messages/send_messages_handler.rs b/server/src/binary/handlers/messages/send_messages_handler.rs
index 2ec1420..8aff95e 100644
--- a/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -3,11 +3,11 @@
use crate::streaming::systems::system::SharedSystem;
use anyhow::Result;
use iggy::error::IggyError;
-use iggy::messages::send_messages::SendMessages;
+use iggy::messages::append_messages::AppendMessages;
use tracing::debug;
pub async fn handle(
- command: &SendMessages,
+ command: &AppendMessages,
sender: &mut dyn Sender,
session: &Session,
system: &SharedSystem,
diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs
index 5c8b288..937dec3 100644
--- a/server/src/http/messages.rs
+++ b/server/src/http/messages.rs
@@ -10,8 +10,8 @@
use axum::routing::get;
use axum::{Extension, Json, Router};
use iggy::identifier::Identifier;
+use iggy::messages::append_messages::AppendMessages;
use iggy::messages::poll_messages::PollMessages;
-use iggy::messages::send_messages::SendMessages;
use iggy::validatable::Validatable;
use std::sync::Arc;
@@ -54,7 +54,7 @@
State(state): State<Arc<AppState>>,
Extension(identity): Extension<Identity>,
Path((stream_id, topic_id)): Path<(String, String)>,
- Json(mut command): Json<SendMessages>,
+ Json(mut command): Json<AppendMessages>,
) -> Result<StatusCode, CustomError> {
command.stream_id = Identifier::from_str_value(&stream_id)?;
command.topic_id = Identifier::from_str_value(&topic_id)?;
diff --git a/server/src/streaming/models/messages.rs b/server/src/streaming/models/messages.rs
index 848b5d3..0fdffee 100644
--- a/server/src/streaming/models/messages.rs
+++ b/server/src/streaming/models/messages.rs
@@ -1,4 +1,4 @@
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
@@ -7,5 +7,5 @@
pub struct PolledMessages {
pub partition_id: u32,
pub current_offset: u64,
- pub messages: Vec<Arc<Message>>,
+ pub messages: Vec<Arc<PolledMessage>>,
}
diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs
index 8489996..4b7d1f2 100644
--- a/server/src/streaming/partitions/messages.rs
+++ b/server/src/streaming/partitions/messages.rs
@@ -3,11 +3,11 @@
use crate::streaming::segments::segment::Segment;
use crate::streaming::utils::random_id;
use iggy::error::IggyError;
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use std::sync::Arc;
use tracing::{trace, warn};
-const EMPTY_MESSAGES: Vec<Arc<Message>> = vec![];
+const EMPTY_MESSAGES: Vec<Arc<PolledMessage>> = vec![];
impl Partition {
pub fn get_messages_count(&self) -> u64 {
@@ -29,7 +29,7 @@
&self,
timestamp: u64,
count: u32,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
trace!(
"Getting messages by timestamp: {} for partition: {}...",
timestamp,
@@ -86,7 +86,7 @@
&self,
start_offset: u64,
count: u32,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
trace!(
"Getting messages for start offset: {} for partition: {}...",
start_offset,
@@ -114,11 +114,17 @@
}
}
- pub async fn get_first_messages(&self, count: u32) -> Result<Vec<Arc<Message>>, IggyError> {
+ pub async fn get_first_messages(
+ &self,
+ count: u32,
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
self.get_messages_by_offset(0, count).await
}
- pub async fn get_last_messages(&self, count: u32) -> Result<Vec<Arc<Message>>, IggyError> {
+ pub async fn get_last_messages(
+ &self,
+ count: u32,
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
let mut count = count as u64;
if count > self.current_offset + 1 {
count = self.current_offset + 1
@@ -133,7 +139,7 @@
&self,
consumer: PollingConsumer,
count: u32,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
let (consumer_offsets, consumer_id) = match consumer {
PollingConsumer::Consumer(consumer_id, _) => (&self.consumer_offsets, consumer_id),
PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
@@ -199,7 +205,7 @@
segments: Vec<&Segment>,
offset: u64,
count: u32,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
let mut messages = Vec::with_capacity(segments.len());
for segment in segments {
let segment_messages = segment.get_messages(offset, count).await?;
@@ -215,7 +221,7 @@
&self,
start_offset: u64,
end_offset: u64,
- ) -> Option<Vec<Arc<Message>>> {
+ ) -> Option<Vec<Arc<PolledMessage>>> {
let cache = self.cache.as_ref()?;
if cache.is_empty() || start_offset > end_offset || end_offset > self.current_offset {
return None;
@@ -238,7 +244,7 @@
pub async fn get_newest_messages_by_size(
&self,
size_bytes: u32,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
trace!(
"Getting messages for size: {} bytes for partition: {}...",
size_bytes,
@@ -272,7 +278,11 @@
Ok(messages)
}
- fn load_messages_from_cache(&self, start_offset: u64, end_offset: u64) -> Vec<Arc<Message>> {
+ fn load_messages_from_cache(
+ &self,
+ start_offset: u64,
+ end_offset: u64,
+ ) -> Vec<Arc<PolledMessage>> {
trace!(
"Loading messages from cache, start offset: {}, end offset: {}...",
start_offset,
@@ -317,7 +327,7 @@
messages
}
- pub async fn append_messages(&mut self, messages: Vec<Message>) -> Result<(), IggyError> {
+ pub async fn append_messages(&mut self, messages: Vec<PolledMessage>) -> Result<(), IggyError> {
{
let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?;
if last_segment.is_closed {
diff --git a/server/src/streaming/partitions/mod.rs b/server/src/streaming/partitions/mod.rs
index b83c034..72d1f4a 100644
--- a/server/src/streaming/partitions/mod.rs
+++ b/server/src/streaming/partitions/mod.rs
@@ -1,5 +1,5 @@
use bytes::Bytes;
-use iggy::models::messages::{Message, MessageState};
+use iggy::models::polled_messages::{MessageState, PolledMessage};
use iggy::utils::checksum;
pub mod consumer_offsets;
@@ -10,7 +10,7 @@
pub mod storage;
#[allow(dead_code)]
-fn create_messages() -> Vec<Message> {
+fn create_messages() -> Vec<PolledMessage> {
vec![
create_message(0, 1, "message 1"),
create_message(1, 2, "message 2"),
@@ -21,10 +21,10 @@
]
}
-fn create_message(offset: u64, id: u128, payload: &str) -> Message {
+fn create_message(offset: u64, id: u128, payload: &str) -> PolledMessage {
let payload = Bytes::from(payload.to_string());
let checksum = checksum::calculate(payload.as_ref());
- Message::create(
+ PolledMessage::create(
offset,
MessageState::Available,
1,
diff --git a/server/src/streaming/partitions/partition.rs b/server/src/streaming/partitions/partition.rs
index 9451395..d9c6ed6 100644
--- a/server/src/streaming/partitions/partition.rs
+++ b/server/src/streaming/partitions/partition.rs
@@ -6,7 +6,7 @@
use crate::streaming::storage::SystemStorage;
use dashmap::DashMap;
use iggy::consumer::ConsumerKind;
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use iggy::utils::timestamp::IggyTimestamp;
use std::sync::Arc;
@@ -17,7 +17,7 @@
pub partition_id: u32,
pub path: String,
pub current_offset: u64,
- pub cache: Option<SmartCache<Arc<Message>>>,
+ pub cache: Option<SmartCache<Arc<PolledMessage>>>,
pub cached_memory_tracker: Option<Arc<CacheMemoryTracker>>,
pub message_deduplicator: Option<MessageDeduplicator>,
pub unsaved_messages_count: u32,
diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs
index 4122e4a..fe8746f 100644
--- a/server/src/streaming/segments/messages.rs
+++ b/server/src/streaming/segments/messages.rs
@@ -2,11 +2,11 @@
use crate::streaming::segments::segment::Segment;
use crate::streaming::segments::time_index::TimeIndex;
use iggy::error::IggyError;
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use std::sync::Arc;
use tracing::trace;
-const EMPTY_MESSAGES: Vec<Arc<Message>> = vec![];
+const EMPTY_MESSAGES: Vec<Arc<PolledMessage>> = vec![];
impl Segment {
pub fn get_messages_count(&self) -> u64 {
@@ -21,7 +21,7 @@
&self,
mut offset: u64,
count: u32,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
if count == 0 {
return Ok(EMPTY_MESSAGES);
}
@@ -62,7 +62,7 @@
Ok(messages)
}
- pub async fn get_all_messages(&self) -> Result<Vec<Arc<Message>>, IggyError> {
+ pub async fn get_all_messages(&self) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
self.get_messages(self.start_offset, self.get_messages_count() as u32)
.await
}
@@ -70,7 +70,7 @@
pub async fn get_newest_messages_by_size(
&self,
size_bytes: u64,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
let messages = self
.storage
.segment
@@ -80,21 +80,25 @@
Ok(messages)
}
- fn load_messages_from_unsaved_buffer(&self, offset: u64, end_offset: u64) -> Vec<Arc<Message>> {
+ fn load_messages_from_unsaved_buffer(
+ &self,
+ offset: u64,
+ end_offset: u64,
+ ) -> Vec<Arc<PolledMessage>> {
self.unsaved_messages
.as_ref()
.unwrap()
.iter()
.filter(|message| message.offset >= offset && message.offset <= end_offset)
.cloned()
- .collect::<Vec<Arc<Message>>>()
+ .collect::<Vec<Arc<PolledMessage>>>()
}
async fn load_messages_from_disk(
&self,
start_offset: u64,
end_offset: u64,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
trace!(
"Loading messages from disk, segment start offset: {}, end offset: {}, current offset: {}...",
start_offset,
@@ -160,7 +164,7 @@
async fn load_messages_from_segment_file(
&self,
index_range: &IndexRange,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
let messages = self
.storage
.segment
@@ -176,7 +180,10 @@
Ok(messages)
}
- pub async fn append_messages(&mut self, messages: &[Arc<Message>]) -> Result<(), IggyError> {
+ pub async fn append_messages(
+ &mut self,
+ messages: &[Arc<PolledMessage>],
+ ) -> Result<(), IggyError> {
if self.is_closed {
return Err(IggyError::SegmentClosed(
self.start_offset,
diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs
index 618cdf5..b3cd20f 100644
--- a/server/src/streaming/segments/segment.rs
+++ b/server/src/streaming/segments/segment.rs
@@ -2,7 +2,7 @@
use crate::streaming::segments::index::Index;
use crate::streaming::segments::time_index::TimeIndex;
use crate::streaming::storage::SystemStorage;
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use iggy::utils::timestamp::IggyTimestamp;
use std::sync::Arc;
@@ -25,7 +25,7 @@
pub current_size_bytes: u32,
pub is_closed: bool,
pub(crate) message_expiry: Option<u32>,
- pub(crate) unsaved_messages: Option<Vec<Arc<Message>>>,
+ pub(crate) unsaved_messages: Option<Vec<Arc<PolledMessage>>>,
pub(crate) config: Arc<SystemConfig>,
pub(crate) indexes: Option<Vec<Index>>,
pub(crate) time_indexes: Option<Vec<TimeIndex>>,
diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs
index 7e48fa5..e88e74a 100644
--- a/server/src/streaming/segments/storage.rs
+++ b/server/src/streaming/segments/storage.rs
@@ -9,7 +9,7 @@
use bytes::{BufMut, Bytes};
use iggy::bytes_serializable::BytesSerializable;
use iggy::error::IggyError;
-use iggy::models::messages::{Message, MessageState};
+use iggy::models::polled_messages::{MessageState, PolledMessage};
use iggy::utils::checksum;
use std::collections::HashMap;
use std::io::SeekFrom;
@@ -171,11 +171,11 @@
&self,
segment: &Segment,
index_range: &IndexRange,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
let mut messages = Vec::with_capacity(
1 + (index_range.end.relative_offset - index_range.start.relative_offset) as usize,
);
- load_messages_by_range(segment, index_range, |message: Message| {
+ load_messages_by_range(segment, index_range, |message: PolledMessage| {
messages.push(Arc::new(message));
Ok(())
})
@@ -188,10 +188,10 @@
&self,
segment: &Segment,
size_bytes: u64,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
let mut messages = Vec::new();
let mut total_size_bytes = 0;
- load_messages_by_size(segment, size_bytes, |message: Message| {
+ load_messages_by_size(segment, size_bytes, |message: PolledMessage| {
total_size_bytes += message.get_size_bytes() as u64;
messages.push(Arc::new(message));
Ok(())
@@ -208,7 +208,7 @@
async fn save_messages(
&self,
segment: &Segment,
- messages: &[Arc<Message>],
+ messages: &[Arc<PolledMessage>],
) -> Result<u32, IggyError> {
let messages_size = messages
.iter()
@@ -234,33 +234,41 @@
async fn load_message_ids(&self, segment: &Segment) -> Result<Vec<u128>, IggyError> {
let mut message_ids = Vec::new();
- load_messages_by_range(segment, &IndexRange::max_range(), |message: Message| {
- message_ids.push(message.id);
- Ok(())
- })
+ load_messages_by_range(
+ segment,
+ &IndexRange::max_range(),
+ |message: PolledMessage| {
+ message_ids.push(message.id);
+ Ok(())
+ },
+ )
.await?;
trace!("Loaded {} message IDs from disk.", message_ids.len());
Ok(message_ids)
}
async fn load_checksums(&self, segment: &Segment) -> Result<(), IggyError> {
- load_messages_by_range(segment, &IndexRange::max_range(), |message: Message| {
- let calculated_checksum = checksum::calculate(&message.payload);
- trace!(
- "Loaded message for offset: {}, checksum: {}, expected: {}",
- message.offset,
- calculated_checksum,
- message.checksum
- );
- if calculated_checksum != message.checksum {
- return Err(IggyError::InvalidMessageChecksum(
- calculated_checksum,
- message.checksum,
+ load_messages_by_range(
+ segment,
+ &IndexRange::max_range(),
+ |message: PolledMessage| {
+ let calculated_checksum = checksum::calculate(&message.payload);
+ trace!(
+ "Loaded message for offset: {}, checksum: {}, expected: {}",
message.offset,
- ));
- }
- Ok(())
- })
+ calculated_checksum,
+ message.checksum
+ );
+ if calculated_checksum != message.checksum {
+ return Err(IggyError::InvalidMessageChecksum(
+ calculated_checksum,
+ message.checksum,
+ message.offset,
+ ));
+ }
+ Ok(())
+ },
+ )
.await?;
Ok(())
}
@@ -399,7 +407,7 @@
&self,
segment: &Segment,
mut current_position: u32,
- messages: &[Arc<Message>],
+ messages: &[Arc<PolledMessage>],
) -> Result<(), IggyError> {
let mut bytes = Vec::with_capacity(messages.len() * 4);
for message in messages {
@@ -492,7 +500,7 @@
async fn save_time_index(
&self,
segment: &Segment,
- messages: &[Arc<Message>],
+ messages: &[Arc<PolledMessage>],
) -> Result<(), IggyError> {
let mut bytes = Vec::with_capacity(messages.len() * 8);
for message in messages {
@@ -520,7 +528,7 @@
async fn load_messages_by_range(
segment: &Segment,
index_range: &IndexRange,
- mut on_message: impl FnMut(Message) -> Result<(), IggyError>,
+ mut on_message: impl FnMut(PolledMessage) -> Result<(), IggyError>,
) -> Result<(), IggyError> {
let file = file::open(&segment.log_path).await?;
let file_size = file.metadata().await?.len();
@@ -602,7 +610,7 @@
let id = id.unwrap();
let checksum = checksum.unwrap();
- let message = Message::create(
+ let message = PolledMessage::create(
offset,
state,
timestamp,
@@ -620,7 +628,7 @@
async fn load_messages_by_size(
segment: &Segment,
size_bytes: u64,
- mut on_message: impl FnMut(Message) -> Result<(), IggyError>,
+ mut on_message: impl FnMut(PolledMessage) -> Result<(), IggyError>,
) -> Result<(), IggyError> {
let file = file::open(&segment.log_path).await?;
let file_size = file.metadata().await?.len();
@@ -693,7 +701,7 @@
let id = id.unwrap();
let checksum = checksum.unwrap();
- let message = Message::create(
+ let message = PolledMessage::create(
offset,
state,
timestamp,
diff --git a/server/src/streaming/storage.rs b/server/src/streaming/storage.rs
index d868128..4f58fe4 100644
--- a/server/src/streaming/storage.rs
+++ b/server/src/streaming/storage.rs
@@ -19,7 +19,7 @@
use async_trait::async_trait;
use iggy::consumer::ConsumerKind;
use iggy::error::IggyError;
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use iggy::models::user_info::UserId;
use sled::Db;
use std::fmt::{Debug, Formatter};
@@ -98,16 +98,16 @@
&self,
segment: &Segment,
index_range: &IndexRange,
- ) -> Result<Vec<Arc<Message>>, IggyError>;
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError>;
async fn load_newest_messages_by_size(
&self,
segment: &Segment,
size_bytes: u64,
- ) -> Result<Vec<Arc<Message>>, IggyError>;
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError>;
async fn save_messages(
&self,
segment: &Segment,
- messages: &[Arc<Message>],
+ messages: &[Arc<PolledMessage>],
) -> Result<u32, IggyError>;
async fn load_message_ids(&self, segment: &Segment) -> Result<Vec<u128>, IggyError>;
async fn load_checksums(&self, segment: &Segment) -> Result<(), IggyError>;
@@ -123,7 +123,7 @@
&self,
segment: &Segment,
current_position: u32,
- messages: &[Arc<Message>],
+ messages: &[Arc<PolledMessage>],
) -> Result<(), IggyError>;
async fn load_all_time_indexes(&self, segment: &Segment) -> Result<Vec<TimeIndex>, IggyError>;
async fn load_last_time_index(&self, segment: &Segment)
@@ -131,7 +131,7 @@
async fn save_time_index(
&self,
segment: &Segment,
- messages: &[Arc<Message>],
+ messages: &[Arc<PolledMessage>],
) -> Result<(), IggyError>;
}
@@ -212,7 +212,7 @@
use crate::streaming::streams::stream::Stream;
use crate::streaming::topics::topic::Topic;
use async_trait::async_trait;
- use iggy::models::messages::Message;
+ use iggy::models::polled_messages::PolledMessage;
use std::sync::Arc;
struct TestSystemInfoStorage {}
@@ -446,7 +446,7 @@
&self,
_segment: &Segment,
_index_range: &IndexRange,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
Ok(vec![])
}
@@ -454,14 +454,14 @@
&self,
_segment: &Segment,
_size: u64,
- ) -> Result<Vec<Arc<Message>>, IggyError> {
+ ) -> Result<Vec<Arc<PolledMessage>>, IggyError> {
Ok(vec![])
}
async fn save_messages(
&self,
_segment: &Segment,
- _messages: &[Arc<Message>],
+ _messages: &[Arc<PolledMessage>],
) -> Result<u32, IggyError> {
Ok(0)
}
@@ -492,7 +492,7 @@
&self,
_segment: &Segment,
_current_position: u32,
- _messages: &[Arc<Message>],
+ _messages: &[Arc<PolledMessage>],
) -> Result<(), IggyError> {
Ok(())
}
@@ -514,7 +514,7 @@
async fn save_time_index(
&self,
_segment: &Segment,
- _messages: &[Arc<Message>],
+ _messages: &[Arc<PolledMessage>],
) -> Result<(), IggyError> {
Ok(())
}
diff --git a/server/src/streaming/systems/messages.rs b/server/src/streaming/systems/messages.rs
index bc0d253..9aebc7a 100644
--- a/server/src/streaming/systems/messages.rs
+++ b/server/src/streaming/systems/messages.rs
@@ -6,10 +6,10 @@
use bytes::Bytes;
use iggy::error::IggyError;
use iggy::identifier::Identifier;
+use iggy::messages::append_messages;
+use iggy::messages::append_messages::Partitioning;
use iggy::messages::poll_messages::PollingStrategy;
-use iggy::messages::send_messages;
-use iggy::messages::send_messages::Partitioning;
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use std::sync::Arc;
use tracing::{error, trace};
@@ -71,7 +71,7 @@
let payload = encryptor.decrypt(&message.payload);
match payload {
Ok(payload) => {
- decrypted_messages.push(Arc::new(Message {
+ decrypted_messages.push(Arc::new(PolledMessage {
id: message.id,
state: message.state,
offset: message.offset,
@@ -100,7 +100,7 @@
stream_id: &Identifier,
topic_id: &Identifier,
partitioning: &Partitioning,
- messages: &Vec<send_messages::Message>,
+ messages: &Vec<append_messages::AppendableMessage>,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
let stream = self.get_stream(stream_id)?;
@@ -120,7 +120,7 @@
let message = match self.encryptor {
Some(ref encryptor) => {
let payload = encryptor.encrypt(message.payload.as_ref())?;
- encrypted_message = send_messages::Message {
+ encrypted_message = append_messages::AppendableMessage {
id: message.id,
length: payload.len() as u32,
payload: Bytes::from(payload),
@@ -131,7 +131,7 @@
None => message,
};
batch_size_bytes += message.get_size_bytes() as u64;
- received_messages.push(Message::from_message(message));
+ received_messages.push(PolledMessage::from_message(message));
}
// If there's enough space in cache, do nothing.
diff --git a/server/src/streaming/topics/messages.rs b/server/src/streaming/topics/messages.rs
index e0ee6d0..ffc56f4 100644
--- a/server/src/streaming/topics/messages.rs
+++ b/server/src/streaming/topics/messages.rs
@@ -4,9 +4,9 @@
use crate::streaming::utils::file::folder_size;
use crate::streaming::utils::hash;
use iggy::error::IggyError;
+use iggy::messages::append_messages::{Partitioning, PartitioningKind};
use iggy::messages::poll_messages::{PollingKind, PollingStrategy};
-use iggy::messages::send_messages::{Partitioning, PartitioningKind};
-use iggy::models::messages::Message;
+use iggy::models::polled_messages::PolledMessage;
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;
@@ -63,7 +63,7 @@
pub async fn append_messages(
&self,
partitioning: &Partitioning,
- messages: Vec<Message>,
+ messages: Vec<PolledMessage>,
) -> Result<(), IggyError> {
if !self.has_partitions() {
return Err(IggyError::NoPartitions(self.topic_id, self.stream_id));
@@ -90,7 +90,7 @@
async fn append_messages_to_partition(
&self,
partition_id: u32,
- messages: Vec<Message>,
+ messages: Vec<PolledMessage>,
) -> Result<(), IggyError> {
let partition = self.partitions.get(&partition_id);
if partition.is_none() {
@@ -205,7 +205,7 @@
Ok(())
}
- fn cache_integrity_check(cache: &[Arc<Message>]) -> bool {
+ fn cache_integrity_check(cache: &[Arc<PolledMessage>]) -> bool {
if cache.is_empty() {
warn!("Cache is empty!");
return false;
@@ -261,7 +261,7 @@
use crate::configs::system::SystemConfig;
use crate::streaming::storage::tests::get_test_system_storage;
use bytes::Bytes;
- use iggy::models::messages::MessageState;
+ use iggy::models::polled_messages::MessageState;
use std::sync::Arc;
#[tokio::test]
@@ -274,7 +274,7 @@
for entity_id in 1..=messages_count {
let payload = Bytes::from("test");
- let messages = vec![Message::empty(
+ let messages = vec![PolledMessage::empty(
1,
MessageState::Available,
entity_id as u128,
@@ -310,7 +310,7 @@
for entity_id in 1..=messages_count {
let partitioning = Partitioning::messages_key_u32(entity_id);
let payload = Bytes::from("test");
- let messages = vec![Message::empty(
+ let messages = vec![PolledMessage::empty(
1,
MessageState::Available,
entity_id as u128,
diff --git a/tools/src/data-seeder/seeder.rs b/tools/src/data-seeder/seeder.rs
index 2b5d7c9..26de429 100644
--- a/tools/src/data-seeder/seeder.rs
+++ b/tools/src/data-seeder/seeder.rs
@@ -2,7 +2,7 @@
use iggy::clients::client::IggyClient;
use iggy::error::IggyError;
use iggy::identifier::Identifier;
-use iggy::messages::send_messages::{Message, Partitioning, SendMessages};
+use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning};
use iggy::models::header::{HeaderKey, HeaderValue};
use iggy::streams::create_stream::CreateStream;
use iggy::topics::create_topic::CreateTopic;
@@ -142,13 +142,13 @@
Some(headers)
}
};
- let mut message = Message::from_str(&payload)?;
+ let mut message = AppendableMessage::from_str(&payload)?;
message.headers = headers;
messages.push(message);
message_id += 1;
}
client
- .send_messages(&mut SendMessages {
+ .send_messages(&mut AppendMessages {
stream_id: Identifier::numeric(stream_id)?,
topic_id: Identifier::numeric(topic.id)?,
partitioning: Partitioning::balanced(),