blob: 17584d008d188cd76dac6d3c5bd557f657376235 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::error::{ClientError, ErrorKind};
use crate::model::message_id::UNIQ_ID_GENERATOR;
use std::collections::HashMap;
pub trait Message {
fn take_message_id(&mut self) -> String;
fn take_topic(&mut self) -> String;
fn take_body(&mut self) -> Vec<u8>;
fn take_tag(&mut self) -> Option<String>;
fn take_keys(&mut self) -> Vec<String>;
fn take_properties(&mut self) -> HashMap<String, String>;
fn take_message_group(&mut self) -> Option<String>;
fn take_delivery_timestamp(&mut self) -> Option<i64>;
}
#[derive(Debug)]
pub struct MessageImpl {
pub(crate) message_id: String,
pub(crate) topic: String,
pub(crate) body: Option<Vec<u8>>,
pub(crate) tags: Option<String>,
pub(crate) keys: Option<Vec<String>>,
pub(crate) properties: Option<HashMap<String, String>>,
pub(crate) message_group: Option<String>,
pub(crate) delivery_timestamp: Option<i64>,
}
impl Message for MessageImpl {
fn take_message_id(&mut self) -> String {
self.message_id.clone()
}
fn take_topic(&mut self) -> String {
self.topic.clone()
}
fn take_body(&mut self) -> Vec<u8> {
self.body.take().unwrap_or(vec![])
}
fn take_tag(&mut self) -> Option<String> {
self.tags.take()
}
fn take_keys(&mut self) -> Vec<String> {
self.keys.take().unwrap_or(vec![])
}
fn take_properties(&mut self) -> HashMap<String, String> {
self.properties.take().unwrap_or(HashMap::new())
}
fn take_message_group(&mut self) -> Option<String> {
self.message_group.take()
}
fn take_delivery_timestamp(&mut self) -> Option<i64> {
self.delivery_timestamp.take()
}
}
impl MessageImpl {
pub fn builder() -> MessageBuilder {
MessageBuilder {
message: MessageImpl {
message_id: UNIQ_ID_GENERATOR.lock().next_id(),
topic: "".to_string(),
body: None,
tags: None,
keys: None,
properties: None,
message_group: None,
delivery_timestamp: None,
},
}
}
pub fn fifo_message_builder(
topic: String,
body: Vec<u8>,
message_group: String,
) -> MessageBuilder {
MessageBuilder {
message: MessageImpl {
message_id: UNIQ_ID_GENERATOR.lock().next_id(),
topic,
body: Some(body),
tags: None,
keys: None,
properties: None,
message_group: Some(message_group),
delivery_timestamp: None,
},
}
}
pub fn delay_message_builder(topic: String, body: Vec<u8>, delay_time: i64) -> MessageBuilder {
MessageBuilder {
message: MessageImpl {
message_id: UNIQ_ID_GENERATOR.lock().next_id(),
topic,
body: Some(body),
tags: None,
keys: None,
properties: None,
message_group: None,
delivery_timestamp: Some(delay_time),
},
}
}
}
pub struct MessageBuilder {
message: MessageImpl,
}
impl MessageBuilder {
const OPERATION_BUILD_MESSAGE: &'static str = "build_message";
pub fn set_topic(mut self, topic: String) -> Self {
self.message.topic = topic;
self
}
pub fn set_body(mut self, body: Vec<u8>) -> Self {
self.message.body = Some(body);
self
}
pub fn set_tags(mut self, tags: String) -> Self {
self.message.tags = Some(tags);
self
}
pub fn set_keys(mut self, keys: Vec<String>) -> Self {
self.message.keys = Some(keys);
self
}
pub fn set_properties(mut self, properties: HashMap<String, String>) -> Self {
self.message.properties = Some(properties);
self
}
pub fn set_message_group(mut self, message_group: String) -> Self {
self.message.message_group = Some(message_group);
self
}
pub fn set_delivery_timestamp(mut self, delivery_timestamp: i64) -> Self {
self.message.delivery_timestamp = Some(delivery_timestamp);
self
}
fn check_message(&self) -> Result<(), String> {
if self.message.topic.is_empty() {
return Err("Topic is empty.".to_string());
}
if self.message.body.is_none() {
return Err("Body is empty.".to_string());
}
if self.message.message_group.is_some() && self.message.delivery_timestamp.is_some() {
return Err(
"message_group and delivery_timestamp can not be set at the same time.".to_string(),
);
}
Ok(())
}
pub fn build(self) -> Result<MessageImpl, ClientError> {
self.check_message().map_err(|e| {
ClientError::new(ErrorKind::InvalidMessage, &e, Self::OPERATION_BUILD_MESSAGE)
})?;
Ok(self.message)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_message() {
let mut properties = HashMap::new();
properties.insert("key".to_string(), "value".to_string());
let message = MessageImpl::builder()
.set_topic("test".to_string())
.set_body(vec![1, 2, 3])
.set_tags("tag".to_string())
.set_keys(vec!["key".to_string()])
.set_properties(properties)
.build();
assert!(message.is_ok());
let mut message = message.unwrap();
assert_eq!(message.take_topic(), "test");
assert_eq!(message.take_body(), vec![1, 2, 3]);
assert_eq!(message.take_tag(), Some("tag".to_string()));
assert_eq!(message.take_keys(), vec!["key".to_string()]);
assert_eq!(message.take_properties(), {
let mut properties = HashMap::new();
properties.insert("key".to_string(), "value".to_string());
properties
});
let message = MessageImpl::builder()
.set_topic("test".to_string())
.set_body(vec![1, 2, 3])
.set_message_group("message_group".to_string())
.set_delivery_timestamp(123456789)
.build();
assert!(message.is_err());
let err = message.unwrap_err();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
assert_eq!(
err.message,
"message_group and delivery_timestamp can not be set at the same time."
);
let message = MessageImpl::builder()
.set_topic("test".to_string())
.set_body(vec![1, 2, 3])
.set_message_group("message_group".to_string())
.build();
let mut message = message.unwrap();
assert_eq!(
message.take_message_group(),
Some("message_group".to_string())
);
let message = MessageImpl::builder()
.set_topic("test".to_string())
.set_body(vec![1, 2, 3])
.set_delivery_timestamp(123456789)
.build();
let mut message = message.unwrap();
assert_eq!(message.take_delivery_timestamp(), Some(123456789));
}
}