| use bytes::Bytes; |
| use iggy::client::{ |
| ConsumerGroupClient, ConsumerOffsetClient, MessageClient, PartitionClient, StreamClient, |
| SystemClient, TopicClient, UserClient, |
| }; |
| use iggy::clients::client::{IggyClient, IggyClientConfig}; |
| use iggy::consumer::{Consumer, ConsumerKind}; |
| use iggy::consumer_groups::create_consumer_group::CreateConsumerGroup; |
| use iggy::consumer_groups::delete_consumer_group::DeleteConsumerGroup; |
| use iggy::consumer_groups::get_consumer_group::GetConsumerGroup; |
| use iggy::consumer_groups::get_consumer_groups::GetConsumerGroups; |
| use iggy::consumer_groups::join_consumer_group::JoinConsumerGroup; |
| use iggy::consumer_groups::leave_consumer_group::LeaveConsumerGroup; |
| use iggy::consumer_offsets::get_consumer_offset::GetConsumerOffset; |
| use iggy::consumer_offsets::store_consumer_offset::StoreConsumerOffset; |
| use iggy::error::IggyError; |
| use iggy::identifier::Identifier; |
| use iggy::messages::append_messages::{AppendMessages, AppendableMessage, Partitioning}; |
| use iggy::messages::poll_messages::{PollMessages, PollingStrategy}; |
| use iggy::partitions::create_partitions::CreatePartitions; |
| use iggy::partitions::delete_partitions::DeletePartitions; |
| use iggy::streams::create_stream::CreateStream; |
| use iggy::streams::delete_stream::DeleteStream; |
| use iggy::streams::get_stream::GetStream; |
| use iggy::streams::get_streams::GetStreams; |
| use iggy::streams::purge_stream::PurgeStream; |
| use iggy::streams::update_stream::UpdateStream; |
| use iggy::system::get_clients::GetClients; |
| use iggy::system::get_me::GetMe; |
| use iggy::system::get_stats::GetStats; |
| use iggy::system::ping::Ping; |
| use iggy::topics::create_topic::CreateTopic; |
| use iggy::topics::delete_topic::DeleteTopic; |
| use iggy::topics::get_topic::GetTopic; |
| use iggy::topics::get_topics::GetTopics; |
| use iggy::topics::purge_topic::PurgeTopic; |
| use iggy::topics::update_topic::UpdateTopic; |
| use iggy::users::defaults::*; |
| use iggy::users::login_user::LoginUser; |
| use iggy::utils::byte_size::IggyByteSize; |
| use integration::test_server::{assert_clean_system, ClientFactory}; |
| |
| const STREAM_ID: u32 = 1; |
| const TOPIC_ID: u32 = 1; |
| const PARTITION_ID: u32 = 1; |
| const PARTITIONS_COUNT: u32 = 3; |
| const CONSUMER_ID: u32 = 1; |
| const CONSUMER_KIND: ConsumerKind = ConsumerKind::Consumer; |
| const STREAM_NAME: &str = "test-stream"; |
| const TOPIC_NAME: &str = "test-topic"; |
| const CONSUMER_GROUP_ID: u32 = 10; |
| const CONSUMER_GROUP_NAME: &str = "test-consumer-group"; |
| const MESSAGES_COUNT: u32 = 1000; |
| |
| pub async fn run(client_factory: &dyn ClientFactory) { |
| let client = client_factory.create_client().await; |
| let client = IggyClient::create(client, IggyClientConfig::default(), None, None, None); |
| |
| // 0. Ping server |
| let ping = Ping {}; |
| client.ping(&ping).await.unwrap(); |
| |
| // 1. Login as root user |
| client |
| .login_user(&LoginUser { |
| username: DEFAULT_ROOT_USERNAME.to_string(), |
| password: DEFAULT_ROOT_PASSWORD.to_string(), |
| }) |
| .await |
| .unwrap(); |
| |
| // 2. Ensure that streams do not exist |
| let streams = client.get_streams(&GetStreams {}).await.unwrap(); |
| assert!(streams.is_empty()); |
| |
| // 3. Create the stream |
| let mut create_stream = CreateStream { |
| stream_id: Some(STREAM_ID), |
| name: STREAM_NAME.to_string(), |
| }; |
| client.create_stream(&create_stream).await.unwrap(); |
| |
| // 4. Get streams and validate that created stream exists |
| let streams = client.get_streams(&GetStreams {}).await.unwrap(); |
| assert_eq!(streams.len(), 1); |
| let stream = streams.first().unwrap(); |
| assert_eq!(stream.id, STREAM_ID); |
| assert_eq!(stream.name, STREAM_NAME); |
| assert_eq!(stream.topics_count, 0); |
| assert_eq!(stream.size_bytes, 0); |
| assert_eq!(stream.messages_count, 0); |
| |
| // 5. Get stream details by ID |
| let stream = client |
| .get_stream(&GetStream { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| assert_eq!(stream.id, STREAM_ID); |
| assert_eq!(stream.name, STREAM_NAME); |
| assert_eq!(stream.topics_count, 0); |
| assert!(stream.topics.is_empty()); |
| assert_eq!(stream.size_bytes, 0); |
| assert_eq!(stream.messages_count, 0); |
| |
| // 6. Get stream details by name |
| let stream = client |
| .get_stream(&GetStream { |
| stream_id: Identifier::named(STREAM_NAME).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| assert_eq!(stream.id, STREAM_ID); |
| assert_eq!(stream.name, STREAM_NAME); |
| |
| // 7. Try to create the stream with the same ID but the different name and validate that it fails |
| create_stream.name = format!("{}-2", STREAM_NAME); |
| let create_stream_result = client.create_stream(&create_stream).await; |
| assert!(create_stream_result.is_err()); |
| |
| // 8. Try to create the stream with the same name but the different ID and validate that it fails |
| create_stream.stream_id = Some(STREAM_ID + 1); |
| create_stream.name = STREAM_NAME.to_string(); |
| let create_stream_result = client.create_stream(&create_stream).await; |
| assert!(create_stream_result.is_err()); |
| |
| // 9. Create the topic |
| let mut create_topic = CreateTopic { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Some(TOPIC_ID), |
| partitions_count: PARTITIONS_COUNT, |
| name: TOPIC_NAME.to_string(), |
| message_expiry: None, |
| max_topic_size: None, |
| replication_factor: 1, |
| }; |
| client.create_topic(&create_topic).await.unwrap(); |
| |
| // 10. Get topics and validate that created topic exists |
| let topics = client |
| .get_topics(&GetTopics { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| assert_eq!(topics.len(), 1); |
| let topic = topics.first().unwrap(); |
| assert_eq!(topic.id, TOPIC_ID); |
| assert_eq!(topic.name, TOPIC_NAME); |
| assert_eq!(topic.partitions_count, PARTITIONS_COUNT); |
| assert_eq!(topic.size, 0); |
| assert_eq!(topic.messages_count, 0); |
| assert_eq!(topic.message_expiry, None); |
| assert_eq!(topic.max_topic_size, None); |
| assert_eq!(topic.replication_factor, 1); |
| |
| // 11. Get topic details by ID |
| let topic = client |
| .get_topic(&GetTopic { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| assert_eq!(topic.id, TOPIC_ID); |
| assert_eq!(topic.name, TOPIC_NAME); |
| assert_eq!(topic.partitions_count, PARTITIONS_COUNT); |
| assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize); |
| assert_eq!(topic.size, 0); |
| assert_eq!(topic.messages_count, 0); |
| let mut id = 1; |
| for topic_partition in topic.partitions { |
| assert_eq!(topic_partition.id, id); |
| assert_eq!(topic_partition.segments_count, 1); |
| assert_eq!(topic_partition.size_bytes, 0); |
| assert_eq!(topic_partition.current_offset, 0); |
| assert_eq!(topic_partition.messages_count, 0); |
| id += 1; |
| } |
| |
| // 12. Get topic details by name |
| let topic = client |
| .get_topic(&GetTopic { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::named(TOPIC_NAME).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| assert_eq!(topic.id, TOPIC_ID); |
| assert_eq!(topic.name, TOPIC_NAME); |
| |
| // 13. Get stream details and validate that created topic exists |
| let stream = client |
| .get_stream(&GetStream { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| assert_eq!(stream.id, STREAM_ID); |
| assert_eq!(stream.name, STREAM_NAME); |
| assert_eq!(stream.topics_count, 1); |
| assert_eq!(stream.topics.len(), 1); |
| assert_eq!(stream.messages_count, 0); |
| let stream_topic = stream.topics.first().unwrap(); |
| assert_eq!(stream_topic.id, topic.id); |
| assert_eq!(stream_topic.name, topic.name); |
| assert_eq!(stream_topic.partitions_count, topic.partitions_count); |
| assert_eq!(stream_topic.size, 0); |
| assert_eq!(stream_topic.messages_count, 0); |
| |
| // 15. Try to create the topic with the same ID but the different name and validate that it fails |
| create_topic.name = format!("{}-2", TOPIC_NAME); |
| let create_topic_result = client.create_topic(&create_topic).await; |
| assert!(create_topic_result.is_err()); |
| |
| // 16. Try to create the topic with the different ID but the same name and validate that it fails |
| create_topic.topic_id = Some(TOPIC_ID + 1); |
| create_topic.name = TOPIC_NAME.to_string(); |
| let create_topic_result = client.create_topic(&create_topic).await; |
| assert!(create_topic_result.is_err()); |
| |
| // 17. Send messages to the specific topic and partition |
| let messages = create_messages(); |
| let mut send_messages = AppendMessages { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| partitioning: Partitioning::partition_id(PARTITION_ID), |
| messages, |
| }; |
| client.send_messages(&mut send_messages).await.unwrap(); |
| |
| // 18. Poll messages from the specific partition in topic |
| let poll_messages = PollMessages { |
| consumer: Consumer { |
| kind: CONSUMER_KIND, |
| id: Identifier::numeric(CONSUMER_ID).unwrap(), |
| }, |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| partition_id: Some(PARTITION_ID), |
| strategy: PollingStrategy::offset(0), |
| count: MESSAGES_COUNT, |
| auto_commit: false, |
| }; |
| |
| let polled_messages = client.poll_messages(&poll_messages).await.unwrap(); |
| assert_eq!(polled_messages.messages.len() as u32, MESSAGES_COUNT); |
| for i in 0..MESSAGES_COUNT { |
| let offset = i as u64; |
| let message = polled_messages.messages.get(i as usize).unwrap(); |
| assert_message(message, offset); |
| } |
| |
| // 19. Messages should be also polled in the smaller batches |
| let batches_count = 10; |
| let batch_size = MESSAGES_COUNT / batches_count; |
| for i in 0..batches_count { |
| let start_offset = (i * batch_size) as u64; |
| let poll_messages = PollMessages { |
| consumer: Consumer { |
| kind: CONSUMER_KIND, |
| id: Identifier::numeric(CONSUMER_ID).unwrap(), |
| }, |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| partition_id: Some(PARTITION_ID), |
| strategy: PollingStrategy::offset(start_offset), |
| count: batch_size, |
| auto_commit: false, |
| }; |
| |
| let polled_messages = client.poll_messages(&poll_messages).await.unwrap(); |
| assert_eq!(polled_messages.messages.len() as u32, batch_size); |
| for i in 0..batch_size as u64 { |
| let offset = start_offset + i; |
| let message = polled_messages.messages.get(i as usize).unwrap(); |
| assert_message(message, offset); |
| } |
| } |
| |
| // 20. Get topic details and validate the partition details |
| let topic = client |
| .get_topic(&GetTopic { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| assert_eq!(topic.id, TOPIC_ID); |
| assert_eq!(topic.name, TOPIC_NAME); |
| assert_eq!(topic.partitions_count, PARTITIONS_COUNT); |
| assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize); |
| assert!(topic.size > 0); |
| assert_eq!(topic.messages_count, MESSAGES_COUNT as u64); |
| let topic_partition = topic.partitions.get((PARTITION_ID - 1) as usize).unwrap(); |
| assert_eq!(topic_partition.id, PARTITION_ID); |
| assert_eq!(topic_partition.segments_count, 1); |
| assert!(topic_partition.size_bytes > 0); |
| assert_eq!(topic_partition.current_offset, (MESSAGES_COUNT - 1) as u64); |
| assert_eq!(topic_partition.messages_count, MESSAGES_COUNT as u64); |
| |
| // 21. Ensure that messages do not exist in the second partition in the same topic |
| let poll_messages = PollMessages { |
| consumer: Consumer { |
| kind: CONSUMER_KIND, |
| id: Identifier::numeric(CONSUMER_ID).unwrap(), |
| }, |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| partition_id: Some(PARTITION_ID + 1), |
| strategy: PollingStrategy::offset(0), |
| count: MESSAGES_COUNT, |
| auto_commit: false, |
| }; |
| let polled_messages = client.poll_messages(&poll_messages).await.unwrap(); |
| assert!(polled_messages.messages.is_empty()); |
| |
| // 22. Get the existing customer offset and ensure it's 0 |
| let offset = client |
| .get_consumer_offset(&GetConsumerOffset { |
| consumer: Consumer { |
| kind: CONSUMER_KIND, |
| id: Identifier::numeric(CONSUMER_ID).unwrap(), |
| }, |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| partition_id: Some(PARTITION_ID), |
| }) |
| .await |
| .unwrap(); |
| assert_eq!(offset.partition_id, PARTITION_ID); |
| assert_eq!(offset.current_offset, (MESSAGES_COUNT - 1) as u64); |
| assert_eq!(offset.stored_offset, 0); |
| |
| // 23. Store the consumer offset |
| let stored_offset = 10; |
| client |
| .store_consumer_offset(&StoreConsumerOffset { |
| consumer: Consumer { |
| kind: CONSUMER_KIND, |
| id: Identifier::numeric(CONSUMER_ID).unwrap(), |
| }, |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| partition_id: Some(PARTITION_ID), |
| offset: stored_offset, |
| }) |
| .await |
| .unwrap(); |
| |
| // 24. Get the existing customer offset and ensure it's the previously stored value |
| let offset = client |
| .get_consumer_offset(&GetConsumerOffset { |
| consumer: Consumer { |
| kind: CONSUMER_KIND, |
| id: Identifier::numeric(CONSUMER_ID).unwrap(), |
| }, |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| partition_id: Some(PARTITION_ID), |
| }) |
| .await |
| .unwrap(); |
| assert_eq!(offset.partition_id, PARTITION_ID); |
| assert_eq!(offset.current_offset, (MESSAGES_COUNT - 1) as u64); |
| assert_eq!(offset.stored_offset, stored_offset); |
| |
| // 25. Poll messages from the specific partition in topic using next with auto commit |
| let messages_count = 10; |
| let poll_messages = PollMessages { |
| consumer: Consumer { |
| kind: CONSUMER_KIND, |
| id: Identifier::numeric(CONSUMER_ID).unwrap(), |
| }, |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| partition_id: Some(PARTITION_ID), |
| strategy: PollingStrategy::next(), |
| count: messages_count, |
| auto_commit: true, |
| }; |
| |
| let polled_messages = client.poll_messages(&poll_messages).await.unwrap(); |
| assert_eq!(polled_messages.messages.len() as u32, messages_count); |
| let first_offset = polled_messages.messages.first().unwrap().offset; |
| let last_offset = polled_messages.messages.last().unwrap().offset; |
| let expected_last_offset = stored_offset + messages_count as u64; |
| assert_eq!(first_offset, stored_offset + 1); |
| assert_eq!(last_offset, expected_last_offset); |
| |
| // 26. Get the existing customer offset and ensure that auto commit during poll has worked |
| let offset = client |
| .get_consumer_offset(&GetConsumerOffset { |
| consumer: Consumer { |
| kind: CONSUMER_KIND, |
| id: Identifier::numeric(CONSUMER_ID).unwrap(), |
| }, |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| partition_id: Some(PARTITION_ID), |
| }) |
| .await |
| .unwrap(); |
| assert_eq!(offset.partition_id, PARTITION_ID); |
| assert_eq!(offset.current_offset, (MESSAGES_COUNT - 1) as u64); |
| assert_eq!(offset.stored_offset, expected_last_offset); |
| |
| // 27. Get the consumer groups and validate that there are no groups |
| let consumer_groups = client |
| .get_consumer_groups(&GetConsumerGroups { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| assert!(consumer_groups.is_empty()); |
| |
| // 28. Create the consumer group |
| client |
| .create_consumer_group(&CreateConsumerGroup { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| consumer_group_id: CONSUMER_GROUP_ID, |
| name: CONSUMER_GROUP_NAME.to_string(), |
| }) |
| .await |
| .unwrap(); |
| |
| // 29. Get the consumer groups and validate that there is one group |
| let consumer_groups = client |
| .get_consumer_groups(&GetConsumerGroups { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| assert_eq!(consumer_groups.len(), 1); |
| let consumer_group = consumer_groups.first().unwrap(); |
| assert_eq!(consumer_group.id, CONSUMER_GROUP_ID); |
| assert_eq!(consumer_group.partitions_count, PARTITIONS_COUNT); |
| assert_eq!(consumer_group.members_count, 0); |
| |
| // 30. Get the consumer group details |
| let consumer_group = client |
| .get_consumer_group(&GetConsumerGroup { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| consumer_group_id: Identifier::numeric(CONSUMER_GROUP_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| assert_eq!(consumer_group.id, CONSUMER_GROUP_ID); |
| assert_eq!(consumer_group.partitions_count, PARTITIONS_COUNT); |
| assert_eq!(consumer_group.members_count, 0); |
| assert!(consumer_group.members.is_empty()); |
| |
| // 31. Join the consumer group and then leave it if the feature is available |
| let result = client |
| .join_consumer_group(&JoinConsumerGroup { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| consumer_group_id: Identifier::numeric(CONSUMER_GROUP_ID).unwrap(), |
| }) |
| .await; |
| |
| match result { |
| Ok(_) => { |
| let consumer_group = client |
| .get_consumer_group(&GetConsumerGroup { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| consumer_group_id: Identifier::numeric(CONSUMER_GROUP_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| assert_eq!(consumer_group.id, CONSUMER_GROUP_ID); |
| assert_eq!(consumer_group.partitions_count, PARTITIONS_COUNT); |
| assert_eq!(consumer_group.name, CONSUMER_GROUP_NAME); |
| assert_eq!(consumer_group.members_count, 1); |
| assert_eq!(consumer_group.members.len(), 1); |
| let member = &consumer_group.members[0]; |
| assert_eq!(member.partitions_count, PARTITIONS_COUNT); |
| |
| let me = client.get_me(&GetMe {}).await.unwrap(); |
| assert!(me.client_id > 0); |
| assert_eq!(me.consumer_groups_count, 1); |
| assert_eq!(me.consumer_groups.len(), 1); |
| let consumer_group = &me.consumer_groups[0]; |
| assert_eq!(consumer_group.stream_id, STREAM_ID); |
| assert_eq!(consumer_group.topic_id, TOPIC_ID); |
| assert_eq!(consumer_group.consumer_group_id, CONSUMER_GROUP_ID); |
| |
| client |
| .leave_consumer_group(&LeaveConsumerGroup { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| consumer_group_id: Identifier::numeric(CONSUMER_GROUP_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| let consumer_group = client |
| .get_consumer_group(&GetConsumerGroup { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| consumer_group_id: Identifier::numeric(CONSUMER_GROUP_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| assert_eq!(consumer_group.members_count, 0); |
| assert!(consumer_group.members.is_empty()); |
| |
| let me = client.get_me(&GetMe {}).await.unwrap(); |
| assert_eq!(me.consumer_groups_count, 0); |
| assert!(me.consumer_groups.is_empty()); |
| } |
| Err(e) => assert_eq!(e.as_code(), IggyError::FeatureUnavailable.as_code()), |
| } |
| |
| // 32. Get the stats and validate that there is one stream |
| let stats = client.get_stats(&GetStats {}).await.unwrap(); |
| assert!(!stats.hostname.is_empty()); |
| assert!(!stats.os_name.is_empty()); |
| assert!(!stats.os_version.is_empty()); |
| assert!(!stats.kernel_version.is_empty()); |
| assert_eq!(stats.streams_count, 1); |
| assert_eq!(stats.topics_count, 1); |
| assert_eq!(stats.partitions_count, PARTITIONS_COUNT); |
| assert_eq!(stats.segments_count, PARTITIONS_COUNT); |
| assert_eq!(stats.messages_count, MESSAGES_COUNT as u64); |
| |
| // 33. Delete the consumer group |
| client |
| .delete_consumer_group(&DeleteConsumerGroup { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| consumer_group_id: Identifier::numeric(CONSUMER_GROUP_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| // 34. Create new partitions and validate that the number of partitions is increased |
| client |
| .create_partitions(&CreatePartitions { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| partitions_count: PARTITIONS_COUNT, |
| }) |
| .await |
| .unwrap(); |
| |
| let topic = client |
| .get_topic(&GetTopic { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| assert_eq!(topic.partitions_count, 2 * PARTITIONS_COUNT); |
| |
| // 35. Delete the partitions and validate that the number of partitions is decreased |
| client |
| .delete_partitions(&DeletePartitions { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| partitions_count: PARTITIONS_COUNT, |
| }) |
| .await |
| .unwrap(); |
| |
| let topic = client |
| .get_topic(&GetTopic { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| assert_eq!(topic.partitions_count, PARTITIONS_COUNT); |
| |
| // 36. Update the existing topic and ensure it's updated |
| let updated_topic_name = format!("{}-updated", TOPIC_NAME); |
| let updated_message_expiry = 1000; |
| let updated_max_topic_size = IggyByteSize::from(0x1337); |
| let updated_replication_factor = 5; |
| |
| client |
| .update_topic(&UpdateTopic { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| name: updated_topic_name.clone(), |
| message_expiry: Some(updated_message_expiry), |
| max_topic_size: Some(updated_max_topic_size), |
| replication_factor: updated_replication_factor, |
| }) |
| .await |
| .unwrap(); |
| |
| let updated_topic = client |
| .get_topic(&GetTopic { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| assert_eq!(updated_topic.name, updated_topic_name); |
| assert_eq!(updated_topic.message_expiry, Some(updated_message_expiry)); |
| assert_eq!(updated_topic.max_topic_size, Some(updated_max_topic_size)); |
| assert_eq!(updated_topic.replication_factor, updated_replication_factor); |
| |
| // 37. Purge the existing topic and ensure it has no messages |
| client |
| .purge_topic(&PurgeTopic { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| let polled_messages = client.poll_messages(&poll_messages).await.unwrap(); |
| assert_eq!(polled_messages.current_offset, 0); |
| assert!(polled_messages.messages.is_empty()); |
| |
| // 38. Update the existing stream and ensure it's updated |
| let updated_stream_name = format!("{}-updated", STREAM_NAME); |
| |
| client |
| .update_stream(&UpdateStream { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| name: updated_stream_name.clone(), |
| }) |
| .await |
| .unwrap(); |
| |
| let updated_stream = client |
| .get_stream(&GetStream { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| assert_eq!(updated_stream.name, updated_stream_name); |
| |
| // 39. Purge the existing stream and ensure it has no messages |
| let messages = create_messages(); |
| let mut send_messages = AppendMessages { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| partitioning: Partitioning::partition_id(PARTITION_ID), |
| messages, |
| }; |
| client.send_messages(&mut send_messages).await.unwrap(); |
| |
| client |
| .purge_stream(&PurgeStream { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| let polled_messages = client.poll_messages(&poll_messages).await.unwrap(); |
| assert_eq!(polled_messages.current_offset, 0); |
| assert!(polled_messages.messages.is_empty()); |
| |
| // 40. Delete the existing topic and ensure it doesn't exist anymore |
| client |
| .delete_topic(&DeleteTopic { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| topic_id: Identifier::numeric(TOPIC_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| let topics = client |
| .get_topics(&GetTopics { |
| stream_id: Identifier::numeric(STREAM_ID).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| assert!(topics.is_empty()); |
| |
| // 41. Create the stream with automatically generated ID on the server |
| let stream_name = format!("{}-auto", STREAM_NAME); |
| let stream_id = STREAM_ID + 1; |
| let create_stream = CreateStream { |
| stream_id: None, |
| name: stream_name.clone(), |
| }; |
| client.create_stream(&create_stream).await.unwrap(); |
| |
| let stream = client |
| .get_stream(&GetStream { |
| stream_id: Identifier::numeric(stream_id).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| assert_eq!(stream.id, stream_id); |
| assert_eq!(stream.name, stream_name); |
| |
| // 42. Create the topic with automatically generated ID on the server |
| let topic_name = format!("{}-auto", TOPIC_NAME); |
| let topic_id = 1; |
| let create_topic = CreateTopic { |
| stream_id: Identifier::numeric(stream_id).unwrap(), |
| topic_id: None, |
| partitions_count: PARTITIONS_COUNT, |
| name: topic_name.clone(), |
| message_expiry: None, |
| max_topic_size: None, |
| replication_factor: 1, |
| }; |
| |
| client.create_topic(&create_topic).await.unwrap(); |
| |
| let topic = client |
| .get_topic(&GetTopic { |
| stream_id: Identifier::numeric(stream_id).unwrap(), |
| topic_id: Identifier::numeric(topic_id).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| |
| assert_eq!(topic.id, topic_id); |
| assert_eq!(topic.name, topic_name); |
| |
| // 43. Delete the existing streams and ensure there's no streams left |
| let streams = client.get_streams(&GetStreams {}).await.unwrap(); |
| assert_eq!(streams.len(), 2); |
| |
| for stream in streams { |
| client |
| .delete_stream(&DeleteStream { |
| stream_id: Identifier::numeric(stream.id).unwrap(), |
| }) |
| .await |
| .unwrap(); |
| } |
| |
| let streams = client.get_streams(&GetStreams {}).await.unwrap(); |
| assert!(streams.is_empty()); |
| |
| // 44. Get clients and ensure that there's 0 (HTTP) or 1 (TCP, QUIC) client |
| let clients = client.get_clients(&GetClients {}).await.unwrap(); |
| |
| assert!(clients.len() <= 1); |
| |
| assert_clean_system(&client).await; |
| } |
| |
| fn assert_message(message: &iggy::models::polled_messages::PolledMessage, offset: u64) { |
| let expected_payload = get_message_payload(offset); |
| assert!(message.timestamp > 0); |
| assert_eq!(message.offset, offset); |
| assert_eq!(message.payload, expected_payload); |
| } |
| |
| fn create_messages() -> Vec<AppendableMessage> { |
| let mut messages = Vec::new(); |
| for offset in 0..MESSAGES_COUNT { |
| let id = (offset + 1) as u128; |
| let payload = get_message_payload(offset as u64); |
| messages.push(AppendableMessage { |
| id, |
| length: payload.len() as u32, |
| payload, |
| headers: None, |
| }); |
| } |
| messages |
| } |
| |
| fn get_message_payload(offset: u64) -> Bytes { |
| Bytes::from(format!("message {}", offset)) |
| } |