[ISSUE #526] feat(rust): support ak/sk authorization (#527)
* feat(rust): support ak/sk authorization
Signed-off-by: SSpirits <admin@lv5.moe>
* feat(rust): change ak/sk type to option
Signed-off-by: SSpirits <admin@lv5.moe>
* Make code idiomatic
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
* feat(rust): fix license
Signed-off-by: SSpirits <admin@lv5.moe>
* feat(rust): optimize code
Signed-off-by: SSpirits <admin@lv5.moe>
* feat(rust): fix msrv test
Signed-off-by: SSpirits <admin@lv5.moe>
* fix(rust): fix msrv test
Signed-off-by: SSpirits <admin@lv5.moe>
---------
Signed-off-by: SSpirits <admin@lv5.moe>
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
Co-authored-by: Li Zhanhui <lizhanhui@gmail.com>
diff --git a/.github/workflows/rust_build.yml b/.github/workflows/rust_build.yml
index b7e4350..6689dd7 100644
--- a/.github/workflows/rust_build.yml
+++ b/.github/workflows/rust_build.yml
@@ -65,7 +65,7 @@
toolchain: ${{ matrix.msrv }}
- name: Check MSRV ${{ matrix.msrv }}
working-directory: ./rust
- run: cp .cargo/Cargo.lock.min Cargo.lock && cargo fetch && cargo +${{ matrix.msrv }} check --locked --frozen
+ run: cp .cargo/Cargo.lock.min Cargo.lock && cargo +${{ matrix.msrv }} fetch && cargo +${{ matrix.msrv }} check --locked --frozen
build:
name: "${{ matrix.os }}"
runs-on: ${{ matrix.os }}
diff --git a/rust/.cargo/Cargo.lock.min b/rust/.cargo/Cargo.lock.min
index 9262a08..3b2ccb8 100644
--- a/rust/.cargo/Cargo.lock.min
+++ b/rust/.cargo/Cargo.lock.min
@@ -137,15 +137,6 @@
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
-name = "block-buffer"
-version = "0.10.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
-dependencies = [
- "generic-array",
-]
-
-[[package]]
name = "bumpalo"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -211,16 +202,6 @@
]
[[package]]
-name = "crypto-common"
-version = "0.1.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
-dependencies = [
- "generic-array",
- "typenum",
-]
-
-[[package]]
name = "ctor"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -250,17 +231,6 @@
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
[[package]]
-name = "digest"
-version = "0.10.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
-dependencies = [
- "block-buffer",
- "crypto-common",
- "subtle",
-]
-
-[[package]]
name = "dirs-next"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -440,16 +410,6 @@
]
[[package]]
-name = "generic-array"
-version = "0.14.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
-dependencies = [
- "typenum",
- "version_check",
-]
-
-[[package]]
name = "getrandom"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -522,15 +482,6 @@
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
-name = "hmac"
-version = "0.12.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
-dependencies = [
- "digest",
-]
-
-[[package]]
name = "hostname"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1315,7 +1266,7 @@
[[package]]
name = "rocketmq"
-version = "0.1.0"
+version = "0.1.1"
dependencies = [
"anyhow",
"async-trait",
@@ -1323,7 +1274,6 @@
"byteorder",
"futures",
"hex",
- "hmac",
"hostname",
"lazy_static",
"mac_address",
@@ -1338,6 +1288,7 @@
"prost 0.11.9",
"prost-types",
"regex",
+ "ring",
"siphasher",
"slog",
"slog-async",
@@ -1579,12 +1530,6 @@
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
-name = "subtle"
-version = "2.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
-
-[[package]]
name = "syn"
version = "1.0.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1975,12 +1920,6 @@
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
[[package]]
-name = "typenum"
-version = "1.16.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
-
-[[package]]
name = "unicode-ident"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/rust/.cargo/config.toml b/rust/.cargo/config.toml
new file mode 100644
index 0000000..311a028
--- /dev/null
+++ b/rust/.cargo/config.toml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+[registries.crates-io]
+protocol = "sparse"
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 80c06fe..d7655d1 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -43,7 +43,6 @@
thiserror = "1.0"
anyhow = "1.0.70"
parking_lot = "0.12"
-hmac = "0.12"
hostname = "0.3.1"
os_type = "2.6.0"
@@ -67,6 +66,7 @@
mockall_double= "0.3.0"
siphasher = "0.3.10"
+ring = "0.16.20"
[build-dependencies]
tonic-build = "0.9.0"
diff --git a/rust/src/client.rs b/rust/src/client.rs
index f13c3a6..4b601fa 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -372,15 +372,10 @@
mut rpc_client: T,
messages: Vec<Message>,
) -> Result<Vec<SendReceipt>, ClientError> {
- let message_count = messages.len();
let request = SendMessageRequest { messages };
let response = rpc_client.send_message(request).await?;
Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?;
- if response.entries.len() != message_count {
- error!(self.logger, "server do not return illegal send result, this may be a bug. except result count: {}, found: {}", response.entries.len(), message_count);
- }
-
Ok(response
.entries
.iter()
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 7ecb691..95b7adc 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -17,12 +17,13 @@
//! Configuration of RocketMQ rust client.
+use std::time::Duration;
+
use crate::model::common::ClientType;
#[allow(unused_imports)]
use crate::producer::Producer;
#[allow(unused_imports)]
use crate::simple_consumer::SimpleConsumer;
-use std::time::Duration;
/// [`ClientOption`] is the configuration of internal client, which manages the connection and request with RocketMQ proxy.
#[derive(Debug, Clone)]
@@ -34,6 +35,8 @@
pub(crate) enable_tls: bool,
pub(crate) timeout: Duration,
pub(crate) long_polling_timeout: Duration,
+ pub(crate) access_key: Option<String>,
+ pub(crate) secret_key: Option<String>,
}
impl Default for ClientOption {
@@ -46,6 +49,8 @@
enable_tls: true,
timeout: Duration::from_secs(3),
long_polling_timeout: Duration::from_secs(40),
+ access_key: None,
+ secret_key: None,
}
}
}
@@ -88,6 +93,24 @@
pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) {
self.long_polling_timeout = long_polling_timeout;
}
+
+ /// Get the access key
+ pub fn access_key(&self) -> Option<&String> {
+ self.access_key.as_ref()
+ }
+ /// Set the access key
+ pub fn set_access_key(&mut self, access_key: impl Into<String>) {
+ self.access_key = Some(access_key.into());
+ }
+
+ /// Get the secret key
+ pub fn secret_key(&self) -> Option<&String> {
+ self.secret_key.as_ref()
+ }
+ /// Set the secret key
+ pub fn set_secret_key(&mut self, secret_key: impl Into<String>) {
+ self.secret_key = Some(secret_key.into());
+ }
}
/// Log format for output.
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 0660931..229d9e5 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -18,7 +18,10 @@
use async_trait::async_trait;
use mockall::automock;
+use ring::hmac;
use slog::{debug, error, info, o, Logger};
+use time::format_description::well_known::Rfc3339;
+use time::OffsetDateTime;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
@@ -217,6 +220,39 @@
"x-mq-protocol-version",
AsciiMetadataValue::from_static(PROTOCOL_VERSION),
);
+
+ let date_time_result = OffsetDateTime::now_local();
+ let date_time = if let Ok(result) = date_time_result {
+ result
+ } else {
+ OffsetDateTime::now_utc()
+ };
+
+ let date_time = date_time.format(&Rfc3339).unwrap();
+
+ metadata.insert(
+ "x-mq-date-time",
+ AsciiMetadataValue::try_from(&date_time).unwrap(),
+ );
+
+ if let Some((access_key, access_secret)) =
+ self.option.access_key().zip(self.option.secret_key())
+ {
+ let key = hmac::Key::new(
+ hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
+ access_secret.as_bytes(),
+ );
+ let signature = hmac::sign(&key, date_time.as_bytes());
+ let signature = hex::encode(signature.as_ref());
+ let authorization = format!(
+ "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, Signature={}",
+ access_key, signature
+ );
+ metadata.insert(
+ "authorization",
+ AsciiMetadataValue::try_from(authorization).unwrap(),
+ );
+ }
}
pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> Result<(), ClientError> {
@@ -458,10 +494,10 @@
#[cfg(test)]
mod tests {
- use crate::conf::ProducerOption;
use slog::debug;
use wiremock_grpc::generate;
+ use crate::conf::ProducerOption;
use crate::log::terminal_logger;
use crate::util::build_producer_settings;