blob: dab88fa4e73c89904694eb64b2cd5ca21118666a [file] [log] [blame]
use crate::compression::compression_algorithm::CompressionAlgorithm;
use crate::consumer::Consumer;
use crate::diagnostic::DiagnosticEvent;
use crate::error::IggyError;
use crate::identifier::Identifier;
use crate::messages::poll_messages::PollingStrategy;
use crate::messages::send_messages::{Message, Partitioning};
use crate::models::client_info::{ClientInfo, ClientInfoDetails};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::identity_info::IdentityInfo;
use crate::models::messages::PolledMessages;
use crate::models::permissions::Permissions;
use crate::models::personal_access_token::{PersonalAccessTokenInfo, RawPersonalAccessToken};
use crate::models::snapshot::Snapshot;
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
use crate::models::topic::{Topic, TopicDetails};
use crate::models::user_info::{UserInfo, UserInfoDetails};
use crate::models::user_status::UserStatus;
use crate::snapshot::{SnapshotCompression, SystemSnapshotType};
use crate::tcp::config::{TcpClientConfig, TcpClientReconnectionConfig};
use crate::utils::duration::IggyDuration;
use crate::utils::expiry::IggyExpiry;
use crate::utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
use crate::utils::topic_size::MaxTopicSize;
use async_broadcast::Receiver;
use async_trait::async_trait;
use std::fmt::Debug;
use std::str::FromStr;
const CONNECTION_STRING_PREFIX: &str = "iggy://";
#[derive(Debug)]
pub(crate) struct ConnectionString {
server_address: String,
auto_login: AutoLogin,
options: ConnectionStringOptions,
}
#[derive(Debug, Clone, PartialEq)]
pub enum AutoLogin {
Disabled,
Enabled(Credentials),
}
#[derive(Debug, Clone, PartialEq)]
pub enum Credentials {
UsernamePassword(String, String),
PersonalAccessToken(String),
}
/// The client trait which is the main interface to the Iggy server.
/// It consists of multiple modules, each of which is responsible for a specific set of commands.
/// Except the ping, login and get me, all the other methods require authentication.
#[async_trait]
pub trait Client:
SystemClient
+ UserClient
+ PersonalAccessTokenClient
+ StreamClient
+ TopicClient
+ PartitionClient
+ MessageClient
+ ConsumerOffsetClient
+ ConsumerGroupClient
+ Sync
+ Send
+ Debug
{
/// Connect to the server. Depending on the selected transport and provided configuration it might also perform authentication, retry logic etc.
/// If the client is already connected, it will do nothing.
async fn connect(&self) -> Result<(), IggyError>;
/// Disconnect from the server. If the client is not connected, it will do nothing.
async fn disconnect(&self) -> Result<(), IggyError>;
// Shutdown the client and release all the resources.
async fn shutdown(&self) -> Result<(), IggyError>;
/// Subscribe to diagnostic events.
async fn subscribe_events(&self) -> Receiver<DiagnosticEvent>;
}
/// This trait defines the methods to interact with the system module.
#[async_trait]
pub trait SystemClient {
/// Get the stats of the system such as PID, memory usage, streams count etc.
///
/// Authentication is required, and the permission to read the server info.
async fn get_stats(&self) -> Result<Stats, IggyError>;
/// Get the info about the currently connected client (not to be confused with the user).
///
/// Authentication is required.
async fn get_me(&self) -> Result<ClientInfoDetails, IggyError>;
/// Get the info about a specific client by unique ID (not to be confused with the user).
///
/// Authentication is required, and the permission to read the server info.
async fn get_client(&self, client_id: u32) -> Result<Option<ClientInfoDetails>, IggyError>;
/// Get the info about all the currently connected clients (not to be confused with the users).
///
/// Authentication is required, and the permission to read the server info.
async fn get_clients(&self) -> Result<Vec<ClientInfo>, IggyError>;
/// Ping the server to check if it's alive.
async fn ping(&self) -> Result<(), IggyError>;
async fn heartbeat_interval(&self) -> IggyDuration;
/// Capture and package the current system state as a snapshot.
///
/// Authentication is required.
async fn snapshot(
&self,
compression: SnapshotCompression,
snapshot_types: Vec<SystemSnapshotType>,
) -> Result<Snapshot, IggyError>;
}
/// This trait defines the methods to interact with the user module.
#[async_trait]
pub trait UserClient {
/// Get the info about a specific user by unique ID or username.
///
/// Authentication is required, and the permission to read the users, unless the provided user ID is the same as the authenticated user.
async fn get_user(&self, user_id: &Identifier) -> Result<Option<UserInfoDetails>, IggyError>;
/// Get the info about all the users.
///
/// Authentication is required, and the permission to read the users.
async fn get_users(&self) -> Result<Vec<UserInfo>, IggyError>;
/// Create a new user.
///
/// Authentication is required, and the permission to manage the users.
async fn create_user(
&self,
username: &str,
password: &str,
status: UserStatus,
permissions: Option<Permissions>,
) -> Result<UserInfoDetails, IggyError>;
/// Delete a user by unique ID or username.
///
/// Authentication is required, and the permission to manage the users.
async fn delete_user(&self, user_id: &Identifier) -> Result<(), IggyError>;
/// Update a user by unique ID or username.
///
/// Authentication is required, and the permission to manage the users.
async fn update_user(
&self,
user_id: &Identifier,
username: Option<&str>,
status: Option<UserStatus>,
) -> Result<(), IggyError>;
/// Update the permissions of a user by unique ID or username.
///
/// Authentication is required, and the permission to manage the users.
async fn update_permissions(
&self,
user_id: &Identifier,
permissions: Option<Permissions>,
) -> Result<(), IggyError>;
/// Change the password of a user by unique ID or username.
///
/// Authentication is required, and the permission to manage the users, unless the provided user ID is the same as the authenticated user.
async fn change_password(
&self,
user_id: &Identifier,
current_password: &str,
new_password: &str,
) -> Result<(), IggyError>;
/// Login a user by username and password.
async fn login_user(&self, username: &str, password: &str) -> Result<IdentityInfo, IggyError>;
/// Logout the currently authenticated user.
async fn logout_user(&self) -> Result<(), IggyError>;
}
/// This trait defines the methods to interact with the personal access token module.
#[async_trait]
pub trait PersonalAccessTokenClient {
/// Get the info about all the personal access tokens of the currently authenticated user.
async fn get_personal_access_tokens(&self) -> Result<Vec<PersonalAccessTokenInfo>, IggyError>;
/// Create a new personal access token for the currently authenticated user.
async fn create_personal_access_token(
&self,
name: &str,
expiry: PersonalAccessTokenExpiry,
) -> Result<RawPersonalAccessToken, IggyError>;
/// Delete a personal access token of the currently authenticated user by unique token name.
async fn delete_personal_access_token(&self, name: &str) -> Result<(), IggyError>;
/// Login the user with the provided personal access token.
async fn login_with_personal_access_token(
&self,
token: &str,
) -> Result<IdentityInfo, IggyError>;
}
/// This trait defines the methods to interact with the stream module.
#[async_trait]
pub trait StreamClient {
/// Get the info about a specific stream by unique ID or name.
///
/// Authentication is required, and the permission to read the streams.
async fn get_stream(&self, stream_id: &Identifier) -> Result<Option<StreamDetails>, IggyError>;
/// Get the info about all the streams.
///
/// Authentication is required, and the permission to read the streams.
async fn get_streams(&self) -> Result<Vec<Stream>, IggyError>;
/// Create a new stream.
///
/// Authentication is required, and the permission to manage the streams.
async fn create_stream(
&self,
name: &str,
stream_id: Option<u32>,
) -> Result<StreamDetails, IggyError>;
/// Update a stream by unique ID or name.
///
/// Authentication is required, and the permission to manage the streams.
async fn update_stream(&self, stream_id: &Identifier, name: &str) -> Result<(), IggyError>;
/// Delete a stream by unique ID or name.
///
/// Authentication is required, and the permission to manage the streams.
async fn delete_stream(&self, stream_id: &Identifier) -> Result<(), IggyError>;
/// Purge a stream by unique ID or name.
///
/// Authentication is required, and the permission to manage the streams.
async fn purge_stream(&self, stream_id: &Identifier) -> Result<(), IggyError>;
}
/// This trait defines the methods to interact with the topic module.
#[allow(clippy::too_many_arguments)]
#[async_trait]
pub trait TopicClient {
/// Get the info about a specific topic by unique ID or name.
///
/// Authentication is required, and the permission to read the topics.
async fn get_topic(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<Option<TopicDetails>, IggyError>;
/// Get the info about all the topics.
///
/// Authentication is required, and the permission to read the topics.
async fn get_topics(&self, stream_id: &Identifier) -> Result<Vec<Topic>, IggyError>;
/// Create a new topic.
///
/// Authentication is required, and the permission to manage the topics.
#[allow(clippy::too_many_arguments)]
async fn create_topic(
&self,
stream_id: &Identifier,
name: &str,
partitions_count: u32,
compression_algorithm: CompressionAlgorithm,
replication_factor: Option<u8>,
topic_id: Option<u32>,
message_expiry: IggyExpiry,
max_topic_size: MaxTopicSize,
) -> Result<TopicDetails, IggyError>;
/// Update a topic by unique ID or name.
///
/// Authentication is required, and the permission to manage the topics.
async fn update_topic(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
name: &str,
compression_algorithm: CompressionAlgorithm,
replication_factor: Option<u8>,
message_expiry: IggyExpiry,
max_topic_size: MaxTopicSize,
) -> Result<(), IggyError>;
/// Delete a topic by unique ID or name.
///
/// Authentication is required, and the permission to manage the topics.
async fn delete_topic(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<(), IggyError>;
/// Purge a topic by unique ID or name.
///
/// Authentication is required, and the permission to manage the topics.
async fn purge_topic(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<(), IggyError>;
}
/// This trait defines the methods to interact with the partition module.
#[async_trait]
pub trait PartitionClient {
/// Create new N partitions for a topic by unique ID or name.
///
/// For example, given a topic with 3 partitions, if you create 2 partitions, the topic will have 5 partitions (from 1 to 5).
///
/// Authentication is required, and the permission to manage the partitions.
async fn create_partitions(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partitions_count: u32,
) -> Result<(), IggyError>;
/// Delete last N partitions for a topic by unique ID or name.
///
/// For example, given a topic with 5 partitions, if you delete 2 partitions, the topic will have 3 partitions left (from 1 to 3).
///
/// Authentication is required, and the permission to manage the partitions.
async fn delete_partitions(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partitions_count: u32,
) -> Result<(), IggyError>;
}
/// This trait defines the methods to interact with the messaging module.
#[async_trait]
pub trait MessageClient {
/// Poll given amount of messages using the specified consumer and strategy from the specified stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to poll the messages.
#[allow(clippy::too_many_arguments)]
async fn poll_messages(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: Option<u32>,
consumer: &Consumer,
strategy: &PollingStrategy,
count: u32,
auto_commit: bool,
) -> Result<PolledMessages, IggyError>;
/// Send messages using specified partitioning strategy to the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to send the messages.
async fn send_messages(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partitioning: &Partitioning,
messages: &mut [Message],
) -> Result<(), IggyError>;
/// Force flush of the `unsaved_messages` buffer to disk, optionally fsyncing the data.
#[allow(clippy::too_many_arguments)]
async fn flush_unsaved_buffer(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: u32,
fsync: bool,
) -> Result<(), IggyError>;
}
/// This trait defines the methods to interact with the consumer offset module.
#[async_trait]
pub trait ConsumerOffsetClient {
/// Store the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to poll the messages.
async fn store_consumer_offset(
&self,
consumer: &Consumer,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: Option<u32>,
offset: u64,
) -> Result<(), IggyError>;
/// Get the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to poll the messages.
async fn get_consumer_offset(
&self,
consumer: &Consumer,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: Option<u32>,
) -> Result<Option<ConsumerOffsetInfo>, IggyError>;
/// Delete the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to poll the messages.
async fn delete_consumer_offset(
&self,
consumer: &Consumer,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: Option<u32>,
) -> Result<(), IggyError>;
}
/// This trait defines the methods to interact with the consumer group module.
#[async_trait]
pub trait ConsumerGroupClient {
/// Get the info about a specific consumer group by unique ID or name for the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to read the streams or topics.
async fn get_consumer_group(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
group_id: &Identifier,
) -> Result<Option<ConsumerGroupDetails>, IggyError>;
/// Get the info about all the consumer groups for the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to read the streams or topics.
async fn get_consumer_groups(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
) -> Result<Vec<ConsumerGroup>, IggyError>;
/// Create a new consumer group for the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to manage the streams or topics.
async fn create_consumer_group(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
name: &str,
group_id: Option<u32>,
) -> Result<ConsumerGroupDetails, IggyError>;
/// Delete a consumer group by unique ID or name for the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to manage the streams or topics.
async fn delete_consumer_group(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
group_id: &Identifier,
) -> Result<(), IggyError>;
/// Join a consumer group by unique ID or name for the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to read the streams or topics.
async fn join_consumer_group(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
group_id: &Identifier,
) -> Result<(), IggyError>;
/// Leave a consumer group by unique ID or name for the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to read the streams or topics.
async fn leave_consumer_group(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
group_id: &Identifier,
) -> Result<(), IggyError>;
}
impl FromStr for ConnectionString {
type Err = IggyError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
ConnectionString::new(s)
}
}
impl ConnectionString {
pub fn new(connection_string: &str) -> Result<Self, IggyError> {
if connection_string.is_empty() {
return Err(IggyError::InvalidConnectionString);
}
if !connection_string.starts_with(CONNECTION_STRING_PREFIX) {
return Err(IggyError::InvalidConnectionString);
}
let connection_string = connection_string.replace(CONNECTION_STRING_PREFIX, "");
let parts = connection_string.split('@').collect::<Vec<&str>>();
if parts.len() != 2 {
return Err(IggyError::InvalidConnectionString);
}
let credentials = parts[0].split(':').collect::<Vec<&str>>();
if credentials.len() != 2 {
return Err(IggyError::InvalidConnectionString);
}
let username = credentials[0];
let password = credentials[1];
if username.is_empty() || password.is_empty() {
return Err(IggyError::InvalidConnectionString);
}
let server_and_options = parts[1].split('?').collect::<Vec<&str>>();
if server_and_options.len() > 2 {
return Err(IggyError::InvalidConnectionString);
}
let server_address = server_and_options[0];
if server_address.is_empty() {
return Err(IggyError::InvalidConnectionString);
}
if !server_address.contains(':') {
return Err(IggyError::InvalidConnectionString);
}
let port = server_address.split(':').collect::<Vec<&str>>()[1];
if port.is_empty() {
return Err(IggyError::InvalidConnectionString);
}
if port.parse::<u16>().is_err() {
return Err(IggyError::InvalidConnectionString);
}
let connection_string_options;
if let Some(options) = server_and_options.get(1) {
connection_string_options = ConnectionString::parse_options(options)?;
} else {
connection_string_options = ConnectionStringOptions::default();
}
Ok(ConnectionString {
server_address: server_address.to_owned(),
auto_login: AutoLogin::Enabled(Credentials::UsernamePassword(
username.to_owned(),
password.to_owned(),
)),
options: connection_string_options,
})
}
fn parse_options(options: &str) -> Result<ConnectionStringOptions, IggyError> {
let options = options.split('&').collect::<Vec<&str>>();
let mut tls_enabled = false;
let mut tls_domain = "localhost".to_string();
let mut reconnection_retries = "unlimited".to_owned();
let mut reconnection_interval = "1s".to_owned();
let mut reestablish_after = "5s".to_owned();
let mut heartbeat_interval = "5s".to_owned();
for option in options {
let option_parts = option.split('=').collect::<Vec<&str>>();
if option_parts.len() != 2 {
return Err(IggyError::InvalidConnectionString);
}
match option_parts[0] {
"tls" => {
tls_enabled = option_parts[1] == "true";
}
"tls_domain" => {
tls_domain = option_parts[1].to_string();
}
"reconnection_retries" => {
reconnection_retries = option_parts[1].to_string();
}
"reconnection_interval" => {
reconnection_interval = option_parts[1].to_string();
}
"reestablish_after" => {
reestablish_after = option_parts[1].to_string();
}
"heartbeat_interval" => {
heartbeat_interval = option_parts[1].to_string();
}
_ => {
return Err(IggyError::InvalidConnectionString);
}
}
}
Ok(ConnectionStringOptions {
tls_enabled,
tls_domain,
heartbeat_interval: IggyDuration::from_str(heartbeat_interval.as_str())
.map_err(|_| IggyError::InvalidConnectionString)?,
reconnection: TcpClientReconnectionConfig {
enabled: true,
max_retries: match reconnection_retries.as_str() {
"unlimited" => None,
_ => Some(
reconnection_retries
.parse()
.map_err(|_| IggyError::InvalidNumberValue)?,
),
},
interval: IggyDuration::from_str(reconnection_interval.as_str())
.map_err(|_| IggyError::InvalidConnectionString)?,
reestablish_after: IggyDuration::from_str(reestablish_after.as_str())
.map_err(|_| IggyError::InvalidConnectionString)?,
},
})
}
}
#[derive(Debug)]
struct ConnectionStringOptions {
tls_enabled: bool,
tls_domain: String,
reconnection: TcpClientReconnectionConfig,
heartbeat_interval: IggyDuration,
}
impl Default for ConnectionStringOptions {
fn default() -> Self {
ConnectionStringOptions {
tls_enabled: false,
tls_domain: "".to_string(),
reconnection: Default::default(),
heartbeat_interval: IggyDuration::from_str("5s").unwrap(),
}
}
}
impl From<ConnectionString> for TcpClientConfig {
fn from(connection_string: ConnectionString) -> Self {
TcpClientConfig {
server_address: connection_string.server_address,
auto_login: connection_string.auto_login,
tls_enabled: connection_string.options.tls_enabled,
tls_domain: connection_string.options.tls_domain,
tls_ca_file: None,
reconnection: connection_string.options.reconnection,
heartbeat_interval: connection_string.options.heartbeat_interval,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn connection_string_without_username_should_fail() {
let server_address = "localhost:1234";
let value = format!("{CONNECTION_STRING_PREFIX}:secret@{server_address}");
let connection_string = ConnectionString::new(&value);
assert!(connection_string.is_err());
}
#[test]
fn connection_string_without_password_should_fail() {
let server_address = "localhost:1234";
let value = format!("{CONNECTION_STRING_PREFIX}user1@{server_address}");
let connection_string = ConnectionString::new(&value);
assert!(connection_string.is_err());
}
#[test]
fn connection_string_without_server_address_should_fail() {
let value = format!("{CONNECTION_STRING_PREFIX}user:secret");
let connection_string = ConnectionString::new(&value);
assert!(connection_string.is_err());
}
#[test]
fn connection_string_without_port_should_fail() {
let value = format!("{CONNECTION_STRING_PREFIX}user:secret@localhost");
let connection_string = ConnectionString::new(&value);
assert!(connection_string.is_err());
}
#[test]
fn connection_string_without_options_should_be_parsed_correctly() {
let username = "user1";
let password = "secret";
let server_address = "localhost:1234";
let value = format!("{CONNECTION_STRING_PREFIX}{username}:{password}@{server_address}");
let connection_string = ConnectionString::new(&value);
assert!(connection_string.is_ok());
let connection_string = connection_string.unwrap();
assert_eq!(connection_string.server_address, server_address);
assert_eq!(
connection_string.auto_login,
AutoLogin::Enabled(Credentials::UsernamePassword(
username.to_string(),
password.to_string()
))
);
assert!(!connection_string.options.tls_enabled);
assert!(connection_string.options.tls_domain.is_empty());
assert!(connection_string.options.reconnection.enabled);
assert!(connection_string.options.reconnection.max_retries.is_none());
assert_eq!(
connection_string.options.reconnection.interval,
IggyDuration::from_str("1s").unwrap()
);
}
#[test]
fn connection_string_with_options_should_be_parsed_correctly() {
let username = "user1";
let password = "secret";
let server_address = "localhost:1234";
let tls_domain = "test.com";
let reconnection_retries = 5;
let reconnection_interval = "5s";
let reestablish_after = "10s";
let heartbeat_interval = "3s";
let value = format!("{CONNECTION_STRING_PREFIX}{username}:{password}@{server_address}?tls=true&tls_domain={tls_domain}&reconnection_retries={reconnection_retries}&reconnection_interval={reconnection_interval}&reestablish_after={reestablish_after}&heartbeat_interval={heartbeat_interval}");
let connection_string = ConnectionString::new(&value);
assert!(connection_string.is_ok());
let connection_string = connection_string.unwrap();
assert_eq!(connection_string.server_address, server_address);
assert_eq!(
connection_string.auto_login,
AutoLogin::Enabled(Credentials::UsernamePassword(
username.to_string(),
password.to_string()
))
);
assert!(connection_string.options.tls_enabled);
assert_eq!(connection_string.options.tls_domain, tls_domain);
assert!(connection_string.options.reconnection.enabled);
assert_eq!(
connection_string.options.reconnection.max_retries,
Some(reconnection_retries)
);
assert_eq!(
connection_string.options.reconnection.interval,
IggyDuration::from_str(reconnection_interval).unwrap()
);
assert_eq!(
connection_string.options.reconnection.reestablish_after,
IggyDuration::from_str(reestablish_after).unwrap()
);
assert_eq!(
connection_string.options.heartbeat_interval,
IggyDuration::from_str(heartbeat_interval).unwrap()
);
}
}