| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| use std::hash::Hasher; |
| use std::sync::atomic::Ordering; |
| use std::sync::Arc; |
| |
| use crate::conf::{ClientOption, ProducerOption, SimpleConsumerOption}; |
| use siphasher::sip::SipHasher24; |
| |
| use crate::error::{ClientError, ErrorKind}; |
| use crate::model::common::{Endpoints, Route}; |
| use crate::pb::settings::PubSub; |
| use crate::pb::telemetry_command::Command; |
| use crate::pb::{ |
| Language, MessageQueue, Publishing, Resource, Settings, Subscription, TelemetryCommand, Ua, |
| }; |
| |
| pub(crate) static SDK_LANGUAGE: Language = Language::Rust; |
| pub(crate) static SDK_VERSION: &str = "5.0.0"; |
| pub(crate) static PROTOCOL_VERSION: &str = "2.0.0"; |
| |
| lazy_static::lazy_static! { |
| pub(crate) static ref HOST_NAME: String = match hostname::get() { |
| Ok(name) => name.to_str().unwrap_or("localhost").to_string(), |
| Err(_) => "localhost".to_string(), |
| }; |
| } |
| |
| pub(crate) fn select_message_queue(route: Arc<Route>) -> MessageQueue { |
| let i = route.index.fetch_add(1, Ordering::Relaxed); |
| route.queue[i % route.queue.len()].clone() |
| } |
| |
| pub(crate) fn select_message_queue_by_message_group( |
| route: Arc<Route>, |
| message_group: String, |
| ) -> MessageQueue { |
| let mut sip_hasher24 = SipHasher24::default(); |
| sip_hasher24.write(message_group.as_bytes()); |
| let index = sip_hasher24.finish() % route.queue.len() as u64; |
| route.queue[index as usize].clone() |
| } |
| |
| pub(crate) fn build_endpoints_by_message_queue( |
| message_queue: &MessageQueue, |
| operation: &'static str, |
| ) -> Result<Endpoints, ClientError> { |
| let topic = message_queue.topic.clone().unwrap().name; |
| if message_queue.broker.is_none() { |
| return Err(ClientError::new( |
| ErrorKind::NoBrokerAvailable, |
| "message queue do not have a available endpoint", |
| operation, |
| ) |
| .with_context("topic", topic) |
| .with_context("queue_id", message_queue.id.to_string())); |
| } |
| |
| let broker = message_queue.broker.clone().unwrap(); |
| if broker.endpoints.is_none() { |
| return Err(ClientError::new( |
| ErrorKind::NoBrokerAvailable, |
| "message queue do not have a available endpoint", |
| operation, |
| ) |
| .with_context("broker", broker.name) |
| .with_context("topic", topic) |
| .with_context("queue_id", message_queue.id.to_string())); |
| } |
| |
| Ok(Endpoints::from_pb_endpoints(broker.endpoints.unwrap())) |
| } |
| |
| pub(crate) fn build_producer_settings( |
| option: &ProducerOption, |
| client_options: &ClientOption, |
| ) -> TelemetryCommand { |
| let topics = option |
| .topics() |
| .clone() |
| .unwrap_or(vec![]) |
| .iter() |
| .map(|topic| Resource { |
| name: topic.to_string(), |
| resource_namespace: option.namespace().to_string(), |
| }) |
| .collect(); |
| let platform = os_type::current_platform(); |
| TelemetryCommand { |
| command: Some(Command::Settings(Settings { |
| client_type: Some(client_options.client_type.clone() as i32), |
| request_timeout: Some(prost_types::Duration { |
| seconds: client_options.timeout().as_secs() as i64, |
| nanos: client_options.timeout().subsec_nanos() as i32, |
| }), |
| pub_sub: Some(PubSub::Publishing(Publishing { |
| topics, |
| validate_message_type: option.validate_message_type(), |
| ..Publishing::default() |
| })), |
| user_agent: Some(Ua { |
| language: SDK_LANGUAGE as i32, |
| version: SDK_VERSION.to_string(), |
| platform: format!("{:?} {}", platform.os_type, platform.version), |
| hostname: HOST_NAME.clone(), |
| }), |
| ..Settings::default() |
| })), |
| ..TelemetryCommand::default() |
| } |
| } |
| |
| pub(crate) fn build_simple_consumer_settings( |
| option: &SimpleConsumerOption, |
| client_option: &ClientOption, |
| ) -> TelemetryCommand { |
| let platform = os_type::current_platform(); |
| TelemetryCommand { |
| command: Some(Command::Settings(Settings { |
| client_type: Some(client_option.client_type.clone() as i32), |
| request_timeout: Some(prost_types::Duration { |
| seconds: client_option.timeout().as_secs() as i64, |
| nanos: client_option.timeout().subsec_nanos() as i32, |
| }), |
| pub_sub: Some(PubSub::Subscription(Subscription { |
| group: Some(Resource { |
| name: option.consumer_group().to_string(), |
| resource_namespace: option.namespace().to_string(), |
| }), |
| subscriptions: vec![], |
| fifo: Some(false), |
| receive_batch_size: None, |
| long_polling_timeout: Some(prost_types::Duration { |
| seconds: client_option.long_polling_timeout().as_secs() as i64, |
| nanos: client_option.long_polling_timeout().subsec_nanos() as i32, |
| }), |
| })), |
| user_agent: Some(Ua { |
| language: SDK_LANGUAGE as i32, |
| version: SDK_VERSION.to_string(), |
| platform: format!("{:?} {}", platform.os_type, platform.version), |
| hostname: HOST_NAME.clone(), |
| }), |
| ..Settings::default() |
| })), |
| ..TelemetryCommand::default() |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use crate::model::common::Route; |
| use crate::pb; |
| use crate::pb::{Broker, MessageQueue}; |
| use std::sync::atomic::AtomicUsize; |
| use std::sync::Arc; |
| |
| use super::*; |
| |
| fn build_route() -> Arc<Route> { |
| let message_queue_1 = MessageQueue { |
| topic: None, |
| id: 1, |
| permission: 0, |
| broker: None, |
| accept_message_types: vec![], |
| }; |
| |
| let message_queue_2 = MessageQueue { |
| topic: None, |
| id: 2, |
| permission: 0, |
| broker: None, |
| accept_message_types: vec![], |
| }; |
| |
| Arc::new(Route { |
| index: AtomicUsize::new(0), |
| queue: vec![message_queue_1, message_queue_2], |
| }) |
| } |
| |
| #[test] |
| fn util_select_message_queue() { |
| let route = build_route(); |
| let message_queue = select_message_queue(route.clone()); |
| assert_eq!(message_queue.id, 1); |
| let message_queue = select_message_queue(route.clone()); |
| assert_eq!(message_queue.id, 2); |
| let message_queue = select_message_queue(route); |
| assert_eq!(message_queue.id, 1); |
| } |
| |
| #[test] |
| fn util_select_message_queue_by_message_group() { |
| let route = build_route(); |
| let message_queue = |
| select_message_queue_by_message_group(route.clone(), "group1".to_string()); |
| assert_eq!(message_queue.id, 1); |
| let message_queue = |
| select_message_queue_by_message_group(route.clone(), "group1".to_string()); |
| assert_eq!(message_queue.id, 1); |
| let message_queue = |
| select_message_queue_by_message_group(route, "another_group".to_string()); |
| assert_eq!(message_queue.id, 2); |
| } |
| |
| #[test] |
| fn util_build_endpoints_by_message_queue() { |
| let mut message_queue = MessageQueue { |
| topic: Some(Resource { |
| name: "topic".to_string(), |
| resource_namespace: "".to_string(), |
| }), |
| id: 1, |
| permission: 0, |
| broker: Some(Broker { |
| name: "".to_string(), |
| id: 0, |
| endpoints: Some(pb::Endpoints { |
| scheme: pb::AddressScheme::DomainName as i32, |
| addresses: vec![], |
| }), |
| }), |
| accept_message_types: vec![], |
| }; |
| let result = build_endpoints_by_message_queue(&message_queue, "test"); |
| assert!(result.is_ok()); |
| assert_eq!(result.unwrap().scheme(), pb::AddressScheme::DomainName); |
| |
| message_queue.broker = Some(Broker { |
| name: "".to_string(), |
| id: 0, |
| endpoints: None, |
| }); |
| let result = build_endpoints_by_message_queue(&message_queue, "test"); |
| assert!(result.is_err()); |
| let error = result.unwrap_err(); |
| assert_eq!(error.kind, ErrorKind::NoBrokerAvailable); |
| assert_eq!(error.operation, "test"); |
| assert_eq!( |
| error.message, |
| "message queue do not have a available endpoint" |
| ); |
| assert_eq!(error.context.len(), 3); |
| |
| message_queue.broker.take(); |
| let result = build_endpoints_by_message_queue(&message_queue, "test"); |
| assert!(result.is_err()); |
| let error = result.unwrap_err(); |
| assert_eq!(error.kind, ErrorKind::NoBrokerAvailable); |
| assert_eq!(error.operation, "test"); |
| assert_eq!( |
| error.message, |
| "message queue do not have a available endpoint" |
| ); |
| assert_eq!(error.context.len(), 2); |
| } |
| |
| #[test] |
| fn util_build_producer_settings() { |
| build_producer_settings(&ProducerOption::default(), &ClientOption::default()); |
| } |
| |
| #[test] |
| fn util_build_simple_consumer_settings() { |
| build_simple_consumer_settings(&SimpleConsumerOption::default(), &ClientOption::default()); |
| } |
| } |