blob: 5cb564830a133ca7e3f19d447894954349f34137 [file] [log] [blame]
use super::constants::{
MANAGE_STREAM_LONG, MANAGE_STREAM_SHORT, MANAGE_TOPICS_LONG, MANAGE_TOPICS_SHORT,
POLL_MESSAGES_LONG, POLL_MESSAGES_SHORT, READ_STREAM_LONG, READ_STREAM_SHORT, READ_TOPICS_LONG,
READ_TOPICS_SHORT, SEND_MESSAGES_LONG, SEND_MESSAGES_SHORT,
};
use crate::args::permissions::topic::TopicPermissionsArg;
use ahash::AHashMap;
use iggy::models::permissions::StreamPermissions;
use std::str::FromStr;
#[derive(Debug, PartialEq)]
pub(super) enum StreamPermission {
ManageStream,
ReadStream,
ManageTopics,
ReadTopics,
PollMessages,
SendMessages,
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct StreamPermissionError(String);
impl FromStr for StreamPermission {
type Err = StreamPermissionError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
MANAGE_STREAM_SHORT | MANAGE_STREAM_LONG => Ok(StreamPermission::ManageStream),
READ_STREAM_SHORT | READ_STREAM_LONG => Ok(StreamPermission::ReadStream),
MANAGE_TOPICS_SHORT | MANAGE_TOPICS_LONG => Ok(StreamPermission::ManageTopics),
READ_TOPICS_SHORT | READ_TOPICS_LONG => Ok(StreamPermission::ReadTopics),
POLL_MESSAGES_SHORT | POLL_MESSAGES_LONG => Ok(StreamPermission::PollMessages),
SEND_MESSAGES_SHORT | SEND_MESSAGES_LONG => Ok(StreamPermission::SendMessages),
"" => Err(StreamPermissionError("[empty]".to_owned())),
_ => Err(StreamPermissionError(s.to_owned())),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct StreamPermissionsArg {
pub(crate) stream_id: u32,
pub(crate) permissions: StreamPermissions,
}
impl From<StreamPermissionsArg> for StreamPermissions {
fn from(cmd: StreamPermissionsArg) -> Self {
cmd.permissions
}
}
impl StreamPermissionsArg {
pub(super) fn new(
stream_id: u32,
stream_permissions: Vec<StreamPermission>,
topic_permissions: Vec<TopicPermissionsArg>,
) -> Self {
let mut result = Self {
stream_id,
permissions: StreamPermissions::default(),
};
for permission in stream_permissions {
result.set_permission(permission);
}
if !topic_permissions.is_empty() {
let mut permissions = AHashMap::new();
for permission in topic_permissions {
permissions.insert(permission.topic_id, permission.permissions);
}
result.permissions.topics = Some(permissions);
}
result
}
fn set_permission(&mut self, permission: StreamPermission) {
match permission {
StreamPermission::ManageStream => self.permissions.manage_stream = true,
StreamPermission::ReadStream => self.permissions.read_stream = true,
StreamPermission::ManageTopics => self.permissions.manage_topics = true,
StreamPermission::ReadTopics => self.permissions.read_topics = true,
StreamPermission::PollMessages => self.permissions.poll_messages = true,
StreamPermission::SendMessages => self.permissions.send_messages = true,
}
}
}
impl FromStr for StreamPermissionsArg {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut p = s.split('#');
let stream_part = match p.next() {
Some(part) => part,
None => return Err("Missing stream permissions part".to_string()),
};
let mut parts = stream_part.split(':');
let stream_id = parts
.next()
.ok_or("Missing stream ID".to_string())
.and_then(|id| {
id.parse()
.map_err(|error| format!("Invalid stream ID - {}", error))
})?;
let (stream_permissions, stream_errors): (Vec<StreamPermission>, Option<String>) =
match parts.next() {
Some(permissions_str) => {
let (values, errors): (Vec<_>, Vec<_>) = permissions_str
.split(',')
.map(|s| s.parse::<StreamPermission>())
.partition(Result::is_ok);
let error = if !errors.is_empty() {
let errors = errors
.into_iter()
.map(|e| format!("\"{}\"", e.err().unwrap().0))
.collect::<Vec<String>>();
Some(format!(
"Unknown permission{} {} for stream ID: {}",
match errors.len() {
1 => "",
_ => "s",
},
errors.join(", "),
stream_id
))
} else {
None
};
(values.into_iter().map(|p| p.unwrap()).collect(), error)
}
None => (vec![], None),
};
let (topic_permissions, topic_errors): (Vec<TopicPermissionsArg>, Option<String>) = {
let (permissions, errors): (Vec<_>, Vec<_>) = p
.map(|s| s.parse::<TopicPermissionsArg>())
.partition(Result::is_ok);
let errors = if !errors.is_empty() {
Some(
errors
.into_iter()
.map(|e| e.err().unwrap())
.collect::<Vec<String>>()
.join("; "),
)
} else {
None
};
(
permissions.into_iter().map(|p| p.unwrap()).collect(),
errors,
)
};
match (stream_errors, topic_errors) {
(Some(e), Some(f)) => return Err(format!("{}; {}", e, f)),
(Some(e), None) => return Err(e),
(None, Some(f)) => return Err(f),
(None, None) => (),
}
Ok(StreamPermissionsArg::new(
stream_id,
stream_permissions,
topic_permissions,
))
}
}
#[cfg(test)]
mod tests {
use iggy::models::permissions::TopicPermissions;
use super::*;
#[test]
fn should_deserialize_single_permission() {
assert_eq!(
StreamPermission::from_str("manage_stream").unwrap(),
StreamPermission::ManageStream
);
assert_eq!(
StreamPermission::from_str("read_stream").unwrap(),
StreamPermission::ReadStream
);
assert_eq!(
StreamPermission::from_str("manage_topics").unwrap(),
StreamPermission::ManageTopics
);
assert_eq!(
StreamPermission::from_str("read_topics").unwrap(),
StreamPermission::ReadTopics
);
assert_eq!(
StreamPermission::from_str("poll_messages").unwrap(),
StreamPermission::PollMessages
);
assert_eq!(
StreamPermission::from_str("send_messages").unwrap(),
StreamPermission::SendMessages
);
}
#[test]
fn should_deserialize_single_short_permission() {
assert_eq!(
StreamPermission::from_str("m_str").unwrap(),
StreamPermission::ManageStream
);
assert_eq!(
StreamPermission::from_str("r_str").unwrap(),
StreamPermission::ReadStream
);
assert_eq!(
StreamPermission::from_str("m_top").unwrap(),
StreamPermission::ManageTopics
);
assert_eq!(
StreamPermission::from_str("r_top").unwrap(),
StreamPermission::ReadTopics
);
assert_eq!(
StreamPermission::from_str("p_msg").unwrap(),
StreamPermission::PollMessages
);
assert_eq!(
StreamPermission::from_str("s_msg").unwrap(),
StreamPermission::SendMessages
);
}
#[test]
fn should_not_deserialize_single_permission() {
let wrong_permission = StreamPermission::from_str("read_topic");
assert!(wrong_permission.is_err());
assert_eq!(
wrong_permission.unwrap_err(),
StreamPermissionError("read_topic".to_owned())
);
let empty_permission = StreamPermission::from_str("");
assert!(empty_permission.is_err());
assert_eq!(
empty_permission.unwrap_err(),
StreamPermissionError("[empty]".to_owned())
);
}
#[test]
fn should_not_deserialize_single_short_permission() {
let wrong_permission = StreamPermission::from_str("w_top");
assert!(wrong_permission.is_err());
assert_eq!(
wrong_permission.unwrap_err(),
StreamPermissionError("w_top".to_owned())
);
let wrong_permission = StreamPermission::from_str("m_msg");
assert!(wrong_permission.is_err());
assert_eq!(
wrong_permission.unwrap_err(),
StreamPermissionError("m_msg".to_owned())
);
}
#[test]
fn should_deserialize_permissions() {
assert_eq!(
StreamPermissionsArg::from_str(
"1:manage_stream,read_stream,manage_topics,read_topics,poll_messages,send_messages"
)
.unwrap(),
StreamPermissionsArg {
stream_id: 1,
permissions: StreamPermissions {
manage_stream: true,
read_stream: true,
manage_topics: true,
read_topics: true,
poll_messages: true,
send_messages: true,
topics: None,
}
}
);
assert_eq!(
StreamPermissionsArg::from_str("1:manage_topics,read_topics").unwrap(),
StreamPermissionsArg {
stream_id: 1,
permissions: StreamPermissions {
manage_stream: false,
read_stream: false,
manage_topics: true,
read_topics: true,
poll_messages: false,
send_messages: false,
topics: None,
}
}
);
assert_eq!(
StreamPermissionsArg::from_str("39:send_messages,read_topics,read_stream").unwrap(),
StreamPermissionsArg {
stream_id: 39,
permissions: StreamPermissions {
manage_stream: false,
read_stream: true,
manage_topics: false,
read_topics: true,
poll_messages: false,
send_messages: true,
topics: None,
}
}
);
assert_eq!(
StreamPermissionsArg::from_str("71").unwrap(),
StreamPermissionsArg {
stream_id: 71,
permissions: StreamPermissions {
manage_stream: false,
read_stream: false,
manage_topics: false,
read_topics: false,
poll_messages: false,
send_messages: false,
topics: None,
}
}
);
assert_eq!(
StreamPermissionsArg::from_str("9:send_messages").unwrap(),
StreamPermissionsArg {
stream_id: 9,
permissions: StreamPermissions {
manage_stream: false,
read_stream: false,
manage_topics: false,
read_topics: false,
poll_messages: false,
send_messages: true,
topics: None,
}
}
);
assert_eq!(
StreamPermissionsArg::from_str("9#1#2").unwrap(),
StreamPermissionsArg {
stream_id: 9,
permissions: StreamPermissions {
manage_stream: false,
read_stream: false,
manage_topics: false,
read_topics: false,
poll_messages: false,
send_messages: false,
topics: Some(AHashMap::from([
(
2,
TopicPermissions {
manage_topic: false,
read_topic: false,
poll_messages: false,
send_messages: false,
}
),
(
1,
TopicPermissions {
manage_topic: false,
read_topic: false,
poll_messages: false,
send_messages: false,
}
)
])),
}
}
);
assert_eq!(
StreamPermissionsArg::from_str("4:manage_topics#1:manage_topic#2:manage_topic")
.unwrap(),
StreamPermissionsArg {
stream_id: 4,
permissions: StreamPermissions {
manage_stream: false,
read_stream: false,
manage_topics: true,
read_topics: false,
poll_messages: false,
send_messages: false,
topics: Some(AHashMap::from([
(
2,
TopicPermissions {
manage_topic: true,
read_topic: false,
poll_messages: false,
send_messages: false,
}
),
(
1,
TopicPermissions {
manage_topic: true,
read_topic: false,
poll_messages: false,
send_messages: false,
}
)
])),
}
}
);
}
#[test]
fn should_deserialize_short_permissions() {
assert_eq!(
StreamPermissionsArg::from_str("111:m_str,r_str,m_top,r_top,p_msg,s_msg").unwrap(),
StreamPermissionsArg {
stream_id: 111,
permissions: StreamPermissions {
manage_stream: true,
read_stream: true,
manage_topics: true,
read_topics: true,
poll_messages: true,
send_messages: true,
topics: None,
}
}
);
assert_eq!(
StreamPermissionsArg::from_str("27:m_top,r_top").unwrap(),
StreamPermissionsArg {
stream_id: 27,
permissions: StreamPermissions {
manage_stream: false,
read_stream: false,
manage_topics: true,
read_topics: true,
poll_messages: false,
send_messages: false,
topics: None,
}
}
);
assert_eq!(
StreamPermissionsArg::from_str("9:s_msg,r_top,r_str").unwrap(),
StreamPermissionsArg {
stream_id: 9,
permissions: StreamPermissions {
manage_stream: false,
read_stream: true,
manage_topics: false,
read_topics: true,
poll_messages: false,
send_messages: true,
topics: None,
}
}
);
assert_eq!(
StreamPermissionsArg::from_str("4:s_msg").unwrap(),
StreamPermissionsArg {
stream_id: 4,
permissions: StreamPermissions {
manage_stream: false,
read_stream: false,
manage_topics: false,
read_topics: false,
poll_messages: false,
send_messages: true,
topics: None,
}
}
);
assert_eq!(
StreamPermissionsArg::from_str("6:m_top#1:m_top#2:m_top").unwrap(),
StreamPermissionsArg {
stream_id: 6,
permissions: StreamPermissions {
manage_stream: false,
read_stream: false,
manage_topics: true,
read_topics: false,
poll_messages: false,
send_messages: false,
topics: Some(AHashMap::from([
(
2,
TopicPermissions {
manage_topic: true,
read_topic: false,
poll_messages: false,
send_messages: false,
}
),
(
1,
TopicPermissions {
manage_topic: true,
read_topic: false,
poll_messages: false,
send_messages: false,
}
)
])),
}
}
);
}
#[test]
fn should_not_deserialize_permissions() {
let wrong_id = StreamPermissionsArg::from_str("4a");
assert!(wrong_id.is_err());
assert_eq!(
wrong_id.unwrap_err(),
"Invalid stream ID - invalid digit found in string"
);
let wrong_permission = StreamPermissionsArg::from_str("4:read_topic");
assert!(wrong_permission.is_err());
assert_eq!(
wrong_permission.unwrap_err(),
"Unknown permission \"read_topic\" for stream ID: 4"
);
let multiple_wrong = StreamPermissionsArg::from_str("55:read_topic,sent_messages");
assert!(multiple_wrong.is_err());
assert_eq!(
multiple_wrong.unwrap_err(),
"Unknown permissions \"read_topic\", \"sent_messages\" for stream ID: 55"
);
let multiple_wrong_with_topic =
StreamPermissionsArg::from_str("55:read_topic,sent_messages#4:reed_topic");
assert!(multiple_wrong_with_topic.is_err());
assert_eq!(
multiple_wrong_with_topic.unwrap_err(),
"Unknown permissions \"read_topic\", \"sent_messages\" for stream ID: 55; Unknown permission \"reed_topic\" for topic ID: 4"
);
}
#[test]
fn should_not_deserialize_short_permissions() {
let wrong_id = StreamPermissionsArg::from_str("1x");
assert!(wrong_id.is_err());
assert_eq!(
wrong_id.unwrap_err(),
"Invalid stream ID - invalid digit found in string"
);
let wrong_permission = StreamPermissionsArg::from_str("7:x_topic");
assert!(wrong_permission.is_err());
assert_eq!(
wrong_permission.unwrap_err(),
"Unknown permission \"x_topic\" for stream ID: 7"
);
let multiple_wrong = StreamPermissionsArg::from_str("37:w_top,s_msgs");
assert!(multiple_wrong.is_err());
assert_eq!(
multiple_wrong.unwrap_err(),
"Unknown permissions \"w_top\", \"s_msgs\" for stream ID: 37"
);
let multiple_wrong_with_topic = StreamPermissionsArg::from_str("11:r_to,s_msg#4:r_to");
assert!(multiple_wrong_with_topic.is_err());
assert_eq!(
multiple_wrong_with_topic.unwrap_err(),
"Unknown permission \"r_to\" for stream ID: 11; Unknown permission \"r_to\" for topic ID: 4"
);
let multiple_wrong_topics =
StreamPermissionsArg::from_str("11:r_top,s_msg#7:r_tox#5:w_top,s_top");
assert!(multiple_wrong_topics.is_err());
assert_eq!(
multiple_wrong_topics.unwrap_err(),
"Unknown permission \"r_tox\" for topic ID: 7; Unknown permissions \"w_top\", \"s_top\" for topic ID: 5"
);
}
}