| use crate::bytes_serializable::BytesSerializable; |
| use crate::error::IggyError; |
| use bytes::{Buf, BufMut, Bytes, BytesMut}; |
| use serde::{Deserialize, Serialize}; |
| use std::collections::HashMap; |
| use std::fmt::Display; |
| |
| /// `Permissions` is used to define the permissions of a user. |
| /// It consists of global permissions and stream permissions. |
| /// Global permissions are applied to all streams. |
| /// Stream permissions are applied to a specific stream. |
| #[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Default)] |
| pub struct Permissions { |
| /// Global permissions are applied to all streams. |
| pub global: GlobalPermissions, |
| |
| /// Stream permissions are applied to a specific stream. |
| pub streams: Option<HashMap<u32, StreamPermissions>>, |
| } |
| |
| /// `GlobalPermissions` are applied to all streams without a need to specify them one by one in the `streams` field. |
| #[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Default)] |
| pub struct GlobalPermissions { |
| /// `manage_servers` permission allows to manage the servers and includes all the permissions of `read_servers`. |
| pub manage_servers: bool, |
| |
| /// `read_servers` permission allows to invoke the following methods: |
| /// - get_stats |
| /// - get_clients |
| /// - get_client |
| pub read_servers: bool, |
| |
| /// `manage_users` permission allows to manage the users and includes all the permissions of `read_users`. |
| /// Additionally, the following methods can be invoked: |
| /// - create_user |
| /// - update_user |
| /// - delete_user |
| /// - update_permissions |
| /// - change_password |
| pub manage_users: bool, |
| |
| /// `read_users` permission allows to invoke the following methods: |
| /// - get_user |
| /// - get_users |
| pub read_users: bool, |
| |
| /// `manage_streams` permission allows to manage the streams and includes all the permissions of `read_streams`. |
| /// Also, it allows to manage all the topics of a stream, thus it has all the permissions of `manage_topics`. |
| /// Additionally, the following methods can be invoked: |
| /// - create_stream |
| /// - update_stream |
| /// - delete_stream |
| pub manage_streams: bool, |
| |
| /// `read_streams` permission allows to read the streams and includes all the permissions of `read_topics`. |
| /// Additionally, the following methods can be invoked: |
| /// - get_stream |
| /// - get_streams |
| pub read_streams: bool, |
| |
| /// `manage_topics` permission allows to manage the topics and includes all the permissions of `read_topics`. |
| /// Also, it allows to manage all the partitions of a topic, thus it has all the permissions of `manage_topic`. |
| /// Additionally, the following methods can be invoked: |
| /// - create_topic |
| /// - update_topic |
| /// - delete_topic |
| pub manage_topics: bool, |
| |
| /// `read_topics` permission allows to read the topics and includes all the permissions of `read_messages`. |
| /// Additionally, the following methods can be invoked: |
| /// - get_topic |
| /// - get_topics |
| pub read_topics: bool, |
| |
| /// `poll_messages` permission allows to poll messages from all the streams and theirs topics. |
| pub poll_messages: bool, |
| |
| /// `send_messages` permission allows to send messages to all the streams and theirs topics. |
| pub send_messages: bool, |
| } |
| |
| /// `StreamPermissions` are applied to a specific stream and its all topics. If you want to define granular permissions for each topic, use the `topics` field. |
| /// These permissions do not override the global permissions, but extend them, and allow more granular control over the streams and the users that can access them. |
| #[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Default)] |
| pub struct StreamPermissions { |
| /// `manage_stream` permission allows to manage the stream and includes all the permissions of `read_stream`. |
| /// Also, it allows to manage all the topics of a stream, thus it has all the permissions of `manage_topics`. |
| /// Additionally, the following methods can be invoked: |
| /// - create_stream |
| /// - update_stream |
| /// - delete_stream |
| pub manage_stream: bool, |
| |
| /// `read_stream` permission allows to read the stream and includes all the permissions of `read_topics`. |
| /// Also, it allows to read all the messages of a topic, thus it has all the permissions of `read_messages`. |
| /// Additionally, the following methods can be invoked: |
| /// - get_stream |
| /// - get_streams |
| pub read_stream: bool, |
| |
| /// `manage_topics` permission allows to manage the topics and includes all the permissions of `read_topics`. |
| /// Also, it allows to manage all the partitions of a topic, thus it has all the permissions of `manage_topic`. |
| /// Additionally, the following methods can be invoked: |
| /// - create_topic |
| /// - update_topic |
| /// - delete_topic |
| pub manage_topics: bool, |
| |
| /// `read_topics` permission allows to read the topics and includes all the permissions of `read_messages`. |
| pub read_topics: bool, |
| |
| /// `poll_messages` permission allows to poll messages from the stream and its topics. |
| pub poll_messages: bool, |
| |
| /// `send_messages` permission allows to send messages to the stream and its topics. |
| pub send_messages: bool, |
| |
| /// The `topics` field allows to define the granular permissions for each topic of a stream. |
| pub topics: Option<HashMap<u32, TopicPermissions>>, |
| } |
| |
| /// `TopicPermissions` are applied to a specific topic of a stream. This is the lowest level of permissions. |
| #[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Default)] |
| pub struct TopicPermissions { |
| /// `manage_topic` permission allows to manage the topic and includes all the permissions of `read_topic`. |
| pub manage_topic: bool, |
| |
| /// `read_topic` permission allows to read the topic and includes all the permissions of `read_messages`. |
| pub read_topic: bool, |
| |
| /// `poll_messages` permission allows to poll messages from the topic. |
| pub poll_messages: bool, |
| |
| /// `send_messages` permission allows to send messages to the topic. |
| pub send_messages: bool, |
| } |
| |
| impl Permissions { |
| pub fn root() -> Self { |
| Self { |
| global: GlobalPermissions { |
| manage_servers: true, |
| read_servers: true, |
| manage_users: true, |
| read_users: true, |
| manage_streams: true, |
| read_streams: true, |
| manage_topics: true, |
| read_topics: true, |
| poll_messages: true, |
| send_messages: true, |
| }, |
| streams: None, |
| } |
| } |
| } |
| |
| impl Display for Permissions { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| let mut result = String::new(); |
| result.push_str(&format!("manage_servers: {}\n", self.global.manage_servers)); |
| result.push_str(&format!("read_servers: {}\n", self.global.read_servers)); |
| result.push_str(&format!("manage_users: {}\n", self.global.manage_users)); |
| result.push_str(&format!("read_users: {}\n", self.global.read_users)); |
| result.push_str(&format!("manage_streams: {}\n", self.global.manage_streams)); |
| result.push_str(&format!("read_streams: {}\n", self.global.read_streams)); |
| result.push_str(&format!("manage_topics: {}\n", self.global.manage_topics)); |
| result.push_str(&format!("read_topics: {}\n", self.global.read_topics)); |
| result.push_str(&format!("poll_messages: {}\n", self.global.poll_messages)); |
| result.push_str(&format!("send_messages: {}\n", self.global.send_messages)); |
| if let Some(streams) = &self.streams { |
| for (stream_id, stream) in streams { |
| result.push_str(&format!("stream_id: {}\n", stream_id)); |
| result.push_str(&format!("manage_stream: {}\n", stream.manage_stream)); |
| result.push_str(&format!("read_stream: {}\n", stream.read_stream)); |
| result.push_str(&format!("manage_topics: {}\n", stream.manage_topics)); |
| result.push_str(&format!("read_topics: {}\n", stream.read_topics)); |
| result.push_str(&format!("poll_messages: {}\n", stream.poll_messages)); |
| result.push_str(&format!("send_messages: {}\n", stream.send_messages)); |
| if let Some(topics) = &stream.topics { |
| for (topic_id, topic) in topics { |
| result.push_str(&format!("topic_id: {}\n", topic_id)); |
| result.push_str(&format!("manage_topic: {}\n", topic.manage_topic)); |
| result.push_str(&format!("read_topic: {}\n", topic.read_topic)); |
| result.push_str(&format!("poll_messages: {}\n", topic.poll_messages)); |
| result.push_str(&format!("send_messages: {}\n", topic.send_messages)); |
| } |
| } |
| } |
| } |
| |
| write!(f, "{}", result) |
| } |
| } |
| |
| impl BytesSerializable for Permissions { |
| fn as_bytes(&self) -> Bytes { |
| let mut bytes = BytesMut::new(); |
| bytes.put_u8(if self.global.manage_servers { 1 } else { 0 }); |
| bytes.put_u8(if self.global.read_servers { 1 } else { 0 }); |
| bytes.put_u8(if self.global.manage_users { 1 } else { 0 }); |
| bytes.put_u8(if self.global.read_users { 1 } else { 0 }); |
| bytes.put_u8(if self.global.manage_streams { 1 } else { 0 }); |
| bytes.put_u8(if self.global.read_streams { 1 } else { 0 }); |
| bytes.put_u8(if self.global.manage_topics { 1 } else { 0 }); |
| bytes.put_u8(if self.global.read_topics { 1 } else { 0 }); |
| bytes.put_u8(if self.global.poll_messages { 1 } else { 0 }); |
| bytes.put_u8(if self.global.send_messages { 1 } else { 0 }); |
| if let Some(streams) = &self.streams { |
| bytes.put_u8(1); |
| let streams_count = streams.len(); |
| let mut current_stream = 1; |
| for (stream_id, stream) in streams { |
| bytes.put_u32_le(*stream_id); |
| bytes.put_u8(if stream.manage_stream { 1 } else { 0 }); |
| bytes.put_u8(if stream.read_stream { 1 } else { 0 }); |
| bytes.put_u8(if stream.manage_topics { 1 } else { 0 }); |
| bytes.put_u8(if stream.read_topics { 1 } else { 0 }); |
| bytes.put_u8(if stream.poll_messages { 1 } else { 0 }); |
| bytes.put_u8(if stream.send_messages { 1 } else { 0 }); |
| if let Some(topics) = &stream.topics { |
| bytes.put_u8(1); |
| let topics_count = topics.len(); |
| let mut current_topic = 1; |
| for (topic_id, topic) in topics { |
| bytes.put_u32_le(*topic_id); |
| bytes.put_u8(if topic.manage_topic { 1 } else { 0 }); |
| bytes.put_u8(if topic.read_topic { 1 } else { 0 }); |
| bytes.put_u8(if topic.poll_messages { 1 } else { 0 }); |
| bytes.put_u8(if topic.send_messages { 1 } else { 0 }); |
| if current_topic < topics_count { |
| current_topic += 1; |
| bytes.put_u8(1); |
| } else { |
| bytes.put_u8(0); |
| } |
| } |
| } else { |
| bytes.put_u8(0); |
| } |
| if current_stream < streams_count { |
| current_stream += 1; |
| bytes.put_u8(1); |
| } else { |
| bytes.put_u8(0); |
| } |
| } |
| } else { |
| bytes.put_u8(0); |
| } |
| bytes.freeze() |
| } |
| |
| fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> |
| where |
| Self: Sized, |
| { |
| let mut bytes = bytes; |
| let manage_servers = bytes.get_u8() == 1; |
| let read_servers = bytes.get_u8() == 1; |
| let manage_users = bytes.get_u8() == 1; |
| let read_users = bytes.get_u8() == 1; |
| let manage_streams = bytes.get_u8() == 1; |
| let read_streams = bytes.get_u8() == 1; |
| let manage_topics = bytes.get_u8() == 1; |
| let read_topics = bytes.get_u8() == 1; |
| let poll_messages = bytes.get_u8() == 1; |
| let send_messages = bytes.get_u8() == 1; |
| let mut streams = None; |
| if bytes.get_u8() == 1 { |
| let mut streams_map = HashMap::new(); |
| loop { |
| let stream_id = bytes.get_u32_le(); |
| let manage_stream = bytes.get_u8() == 1; |
| let read_stream = bytes.get_u8() == 1; |
| let manage_topics = bytes.get_u8() == 1; |
| let read_topics = bytes.get_u8() == 1; |
| let poll_messages = bytes.get_u8() == 1; |
| let send_messages = bytes.get_u8() == 1; |
| let mut topics = None; |
| if bytes.get_u8() == 1 { |
| let mut topics_map = HashMap::new(); |
| loop { |
| let topic_id = bytes.get_u32_le(); |
| let manage_topic = bytes.get_u8() == 1; |
| let read_topic = bytes.get_u8() == 1; |
| let poll_messages = bytes.get_u8() == 1; |
| let send_messages = bytes.get_u8() == 1; |
| topics_map.insert( |
| topic_id, |
| TopicPermissions { |
| manage_topic, |
| read_topic, |
| poll_messages, |
| send_messages, |
| }, |
| ); |
| if bytes.get_u8() == 0 { |
| break; |
| } |
| } |
| topics = Some(topics_map); |
| } |
| streams_map.insert( |
| stream_id, |
| StreamPermissions { |
| manage_stream, |
| read_stream, |
| manage_topics, |
| read_topics, |
| poll_messages, |
| send_messages, |
| topics, |
| }, |
| ); |
| if bytes.get_u8() == 0 { |
| break; |
| } |
| } |
| streams = Some(streams_map); |
| } |
| Ok(Self { |
| global: GlobalPermissions { |
| manage_servers, |
| read_servers, |
| manage_users, |
| read_users, |
| manage_streams, |
| read_streams, |
| manage_topics, |
| read_topics, |
| poll_messages, |
| send_messages, |
| }, |
| streams, |
| }) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| #[test] |
| fn should_be_serialized_and_deserialized_from_bytes() { |
| let permissions = Permissions { |
| global: GlobalPermissions { |
| manage_servers: true, |
| read_servers: true, |
| manage_users: true, |
| read_users: true, |
| manage_streams: false, |
| read_streams: true, |
| manage_topics: false, |
| read_topics: true, |
| poll_messages: true, |
| send_messages: true, |
| }, |
| streams: Some(HashMap::from([ |
| ( |
| 1, |
| StreamPermissions { |
| manage_stream: true, |
| read_stream: true, |
| manage_topics: true, |
| read_topics: true, |
| poll_messages: true, |
| send_messages: true, |
| topics: Some(HashMap::from([ |
| ( |
| 1, |
| TopicPermissions { |
| manage_topic: true, |
| read_topic: true, |
| poll_messages: true, |
| send_messages: true, |
| }, |
| ), |
| ( |
| 2, |
| TopicPermissions { |
| manage_topic: true, |
| read_topic: false, |
| poll_messages: true, |
| send_messages: false, |
| }, |
| ), |
| ])), |
| }, |
| ), |
| ( |
| 2, |
| StreamPermissions { |
| manage_stream: false, |
| read_stream: true, |
| manage_topics: false, |
| read_topics: true, |
| poll_messages: true, |
| send_messages: true, |
| topics: None, |
| }, |
| ), |
| ])), |
| }; |
| |
| let bytes = permissions.as_bytes(); |
| let deserialized_permissions = Permissions::from_bytes(bytes).unwrap(); |
| |
| assert_eq!(permissions, deserialized_permissions); |
| } |
| } |