[ISSUE #575] Gracefully shutdown for Rust client (#576)
* feat(rust): add gracefully shutdown for rust sdk
Signed-off-by: SSpirits <admin@lv5.moe>
* fix(rust): fix test
Signed-off-by: SSpirits <admin@lv5.moe>
* fix(rust): apply reviewer changes
Signed-off-by: SSpirits <admin@lv5.moe>
---------
Signed-off-by: SSpirits <admin@lv5.moe>
diff --git a/rust/examples/delay_producer.rs b/rust/examples/delay_producer.rs
index 3e237c3..cca3bae 100644
--- a/rust/examples/delay_producer.rs
+++ b/rust/examples/delay_producer.rs
@@ -23,8 +23,8 @@
#[tokio::main]
async fn main() {
- // recommend to specify which topic(s) you would like to send message to
- // producer will prefetch topic route when start and failed fast if topic not exist
+ // It's recommended to specify the topics that applications will publish messages to
+ // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["delay_test"]);
@@ -34,7 +34,11 @@
// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
- producer.start().await.unwrap();
+ let start_result = producer.start().await;
+ if start_result.is_err() {
+ eprintln!("producer start failed: {:?}", start_result.unwrap_err());
+ return;
+ }
// build message
let message = MessageBuilder::delay_message_builder(
@@ -51,10 +55,23 @@
.unwrap();
// send message to rocketmq proxy
- let result = producer.send(message).await;
- debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+ let send_result = producer.send(message).await;
+ if send_result.is_err() {
+ eprintln!("send message failed: {:?}", send_result.unwrap_err());
+ return;
+ }
println!(
"send message success, message_id={}",
- result.unwrap().message_id()
+ send_result.unwrap().message_id()
);
+
+ // shutdown the producer when you don't need it anymore.
+ // recommend shutdown manually to gracefully stop and unregister from server
+ let shutdown_result = producer.shutdown().await;
+ if shutdown_result.is_err() {
+ eprintln!(
+ "producer shutdown failed: {:?}",
+ shutdown_result.unwrap_err()
+ );
+ }
}
diff --git a/rust/examples/fifo_producer.rs b/rust/examples/fifo_producer.rs
index 38562e2..211ae68 100644
--- a/rust/examples/fifo_producer.rs
+++ b/rust/examples/fifo_producer.rs
@@ -20,8 +20,8 @@
#[tokio::main]
async fn main() {
- // recommend to specify which topic(s) you would like to send message to
- // producer will prefetch topic route when start and failed fast if topic not exist
+ // It's recommended to specify the topics that applications will publish messages to
+ // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["fifo_test"]);
@@ -31,7 +31,11 @@
// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
- producer.start().await.unwrap();
+ let start_result = producer.start().await;
+ if start_result.is_err() {
+ eprintln!("producer start failed: {:?}", start_result.unwrap_err());
+ return;
+ }
// build message
let message = MessageBuilder::fifo_message_builder(
@@ -44,10 +48,23 @@
.unwrap();
// send message to rocketmq proxy
- let result = producer.send(message).await;
- debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+ let send_result = producer.send(message).await;
+ if send_result.is_err() {
+ eprintln!("send message failed: {:?}", send_result.unwrap_err());
+ return;
+ }
println!(
"send message success, message_id={}",
- result.unwrap().message_id()
+ send_result.unwrap().message_id()
);
+
+ // shutdown the producer when you don't need it anymore.
+ // you should shutdown it manually to gracefully stop and unregister from server
+ let shutdown_result = producer.shutdown().await;
+ if shutdown_result.is_err() {
+ eprintln!(
+ "producer shutdown failed: {:?}",
+ shutdown_result.unwrap_err()
+ );
+ }
}
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index 335b31f..3e818e9 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -20,8 +20,8 @@
#[tokio::main]
async fn main() {
- // recommend to specify which topic(s) you would like to send message to
- // producer will prefetch topic route when start and failed fast if topic not exist
+ // It's recommended to specify the topics that applications will publish messages to
+ // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["test_topic"]);
@@ -31,7 +31,11 @@
// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
- producer.start().await.unwrap();
+ let start_result = producer.start().await;
+ if start_result.is_err() {
+ eprintln!("producer start failed: {:?}", start_result.unwrap_err());
+ return;
+ }
// build message
let message = MessageBuilder::builder()
@@ -42,10 +46,23 @@
.unwrap();
// send message to rocketmq proxy
- let result = producer.send(message).await;
- debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+ let send_result = producer.send(message).await;
+ if send_result.is_err() {
+ eprintln!("send message failed: {:?}", send_result.unwrap_err());
+ return;
+ }
println!(
"send message success, message_id={}",
- result.unwrap().message_id()
+ send_result.unwrap().message_id()
);
+
+ // shutdown the producer when you don't need it anymore.
+ // you should shutdown it manually to gracefully stop and unregister from server
+ let shutdown_result = producer.shutdown().await;
+ if shutdown_result.is_ok() {
+ eprintln!(
+ "producer shutdown failed: {:?}",
+ shutdown_result.unwrap_err()
+ );
+ }
}
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
index a5f9408..fc1a838 100644
--- a/rust/examples/simple_consumer.rs
+++ b/rust/examples/simple_consumer.rs
@@ -20,8 +20,8 @@
#[tokio::main]
async fn main() {
- // recommend to specify which topic(s) you would like to send message to
- // simple consumer will prefetch topic route when start and failed fast if topic not exist
+ // It's recommended to specify the topics that applications will publish messages to
+ // because the simple consumer will prefetch topic routes for them on start and fail fast in case they do not exist
let mut consumer_option = SimpleConsumerOption::default();
consumer_option.set_topics(vec!["test_topic"]);
consumer_option.set_consumer_group("SimpleConsumerGroup");
@@ -33,38 +33,54 @@
// build and start simple consumer
let mut consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
- consumer.start().await.unwrap();
-
- loop {
- // pop message from rocketmq proxy
- let receive_result = consumer
- .receive(
- "test_topic".to_string(),
- &FilterExpression::new(FilterType::Tag, "test_tag"),
- )
- .await;
- debug_assert!(
- receive_result.is_ok(),
- "receive message failed: {:?}",
- receive_result.unwrap_err()
+ let start_result = consumer.start().await;
+ if start_result.is_err() {
+ eprintln!(
+ "simple consumer start failed: {:?}",
+ start_result.unwrap_err()
);
+ return;
+ }
- let messages = receive_result.unwrap();
+ // pop message from rocketmq proxy
+ let receive_result = consumer
+ .receive(
+ "test_topic".to_string(),
+ &FilterExpression::new(FilterType::Tag, "test_tag"),
+ )
+ .await;
+ if receive_result.is_err() {
+ eprintln!("receive message failed: {:?}", receive_result.unwrap_err());
+ return;
+ }
- if messages.is_empty() {
- println!("no message received");
- return;
- }
+ let messages = receive_result.unwrap();
- for message in messages {
- println!("receive message: {:?}", message);
- // ack message to rocketmq proxy
- let ack_result = consumer.ack(&message).await;
- debug_assert!(
- ack_result.is_ok(),
- "ack message failed: {:?}",
+ if messages.is_empty() {
+ println!("no message received");
+ return;
+ }
+
+ for message in messages {
+ println!("receive message: {:?}", message);
+ // ack message to rocketmq proxy
+ let ack_result = consumer.ack(&message).await;
+ if ack_result.is_err() {
+ eprintln!(
+ "ack message {} failed: {:?}",
+ message.message_id(),
ack_result.unwrap_err()
);
}
}
+
+ // shutdown the simple consumer when you don't need it anymore.
+ // you should shutdown it manually to gracefully stop and unregister from server
+ let shutdown_result = consumer.shutdown().await;
+ if shutdown_result.is_err() {
+ eprintln!(
+ "simple consumer shutdown failed: {:?}",
+ shutdown_result.unwrap_err()
+ );
+ }
}
diff --git a/rust/examples/transaction_producer.rs b/rust/examples/transaction_producer.rs
index 7423cc9..6df6cb6 100644
--- a/rust/examples/transaction_producer.rs
+++ b/rust/examples/transaction_producer.rs
@@ -28,8 +28,8 @@
#[tokio::main]
async fn main() {
- // recommend to specify which topic(s) you would like to send message to
- // producer will prefetch topic route when start and failed fast if topic not exist
+ // It's recommended to specify the topics that applications will publish messages to
+ // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["transaction_test"]);
@@ -62,7 +62,11 @@
}),
)
.unwrap();
- producer.start().await.unwrap();
+ let start_result = producer.start().await;
+ if start_result.is_err() {
+ eprintln!("producer start failed: {:?}", start_result.unwrap_err());
+ return;
+ }
// build message
let message = MessageBuilder::transaction_message_builder(
@@ -93,5 +97,18 @@
// commit transaction manually
// delete following two lines so that RocketMQ server will check transaction status periodically
let result = transaction.commit().await;
- debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result);
+ if result.is_err() {
+ eprintln!("commit transaction failed: {:?}", result.unwrap_err());
+ return;
+ }
+
+ // shutdown the producer when you don't need it anymore.
+ // you should shutdown it manually to gracefully stop and unregister from server
+ let shutdown_result = producer.shutdown().await;
+ if shutdown_result.is_err() {
+ eprintln!(
+ "transaction producer shutdown failed: {:?}",
+ shutdown_result.unwrap_err()
+ );
+ }
}
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 57bddb8..3804b4a 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -37,9 +37,9 @@
use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
use crate::pb::{
AckMessageRequest, AckMessageResultEntry, Code, EndTransactionRequest, FilterExpression,
- HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, QueryRouteRequest,
- ReceiveMessageRequest, Resource, SendMessageRequest, Status, TelemetryCommand,
- TransactionSource,
+ HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, NotifyClientTerminationRequest,
+ QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest, Status,
+ TelemetryCommand, TransactionSource,
};
#[double]
use crate::session::SessionManager;
@@ -55,6 +55,7 @@
settings: TelemetryCommand,
transaction_checker: Option<Box<TransactionChecker>>,
telemetry_command_tx: Option<mpsc::Sender<pb::telemetry_command::Command>>,
+ shutdown_tx: Option<oneshot::Sender<()>>,
}
lazy_static::lazy_static! {
@@ -62,6 +63,8 @@
}
const OPERATION_CLIENT_NEW: &str = "client.new";
+const OPERATION_CLIENT_START: &str = "client.start";
+const OPERATION_CLIENT_SHUTDOWN: &str = "client.shutdown";
const OPERATION_GET_SESSION: &str = "client.get_session";
const OPERATION_QUERY_ROUTE: &str = "client.query_route";
const OPERATION_HEARTBEAT: &str = "client.heartbeat";
@@ -102,11 +105,12 @@
settings,
transaction_checker: None,
telemetry_command_tx: None,
+ shutdown_tx: None,
})
}
pub(crate) fn is_started(&self) -> bool {
- self.telemetry_command_tx.is_some()
+ self.shutdown_tx.is_some()
}
pub(crate) fn has_transaction_checker(&self) -> bool {
@@ -124,20 +128,27 @@
let logger = self.logger.clone();
let session_manager = self.session_manager.clone();
- let group = self.option.group.to_string();
+ let group = self.option.group.clone();
let namespace = self.option.namespace.to_string();
let client_type = self.option.client_type.clone();
+ let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+ self.shutdown_tx = Some(shutdown_tx);
+
// send heartbeat and handle telemetry command
- let (tx, mut rx) = mpsc::channel(16);
- self.telemetry_command_tx = Some(tx);
- let rpc_client = self.get_session().await?;
+ let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16);
+ self.telemetry_command_tx = Some(telemetry_command_tx);
+ let rpc_client = self
+ .get_session()
+ .await
+ .map_err(|error| error.with_operation(OPERATION_CLIENT_START))?;
let endpoints = self.access_endpoints.clone();
let transaction_checker = self.transaction_checker.take();
// give a placeholder
if transaction_checker.is_some() {
self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN));
}
+
tokio::spawn(async move {
rpc_client.is_started();
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
@@ -178,20 +189,57 @@
debug!(logger,"send heartbeat to server success, peer={}",peer);
}
},
- command = rx.recv() => {
+ command = telemetry_command_rx.recv() => {
if let Some(command) = command {
let result = Self::handle_telemetry_command(rpc_client.clone(), &transaction_checker, endpoints.clone(), command).await;
if let Err(error) = result {
- error!(logger, "handle telemetry command failed: {:?}", error)
+ error!(logger, "handle telemetry command failed: {:?}", error);
}
}
},
+ _ = &mut shutdown_rx => {
+ debug!(logger, "receive shutdown signal, stop heartbeat task and telemetry command handler");
+ break;
+ }
}
}
+ info!(
+ logger,
+ "heartbeat task and telemetry command handler are stopped"
+ );
});
Ok(())
}
+ fn check_started(&self, operation: &'static str) -> Result<(), ClientError> {
+ if !self.is_started() {
+ return Err(ClientError::new(
+ ErrorKind::ClientIsNotRunning,
+ "client is not started",
+ operation,
+ )
+ .with_context("client_id", self.id.clone()));
+ }
+ Ok(())
+ }
+
+ pub(crate) async fn shutdown(mut self) -> Result<(), ClientError> {
+ self.check_started(OPERATION_CLIENT_SHUTDOWN)?;
+ let mut rpc_client = self.get_session().await?;
+ self.telemetry_command_tx = None;
+ if let Some(tx) = self.shutdown_tx.take() {
+ let _ = tx.send(());
+ }
+ let group = self.option.group.as_ref().map(|group| Resource {
+ name: group.to_string(),
+ resource_namespace: self.option.namespace.to_string(),
+ });
+ let response = rpc_client.notify_shutdown(NotifyClientTerminationRequest { group });
+ Self::handle_response_status(response.await?.status, OPERATION_CLIENT_SHUTDOWN)?;
+ self.session_manager.shutdown().await;
+ Ok(())
+ }
+
async fn handle_telemetry_command<T: RPCClient + 'static>(
mut rpc_client: T,
transaction_checker: &Option<Box<TransactionChecker>>,
@@ -268,14 +316,7 @@
}
pub(crate) async fn get_session(&self) -> Result<Session, ClientError> {
- if !self.is_started() {
- return Err(ClientError::new(
- ErrorKind::ClientIsNotRunning,
- "client is not started",
- OPERATION_GET_SESSION,
- )
- .with_context("client_id", self.id.clone()));
- }
+ self.check_started(OPERATION_GET_SESSION)?;
let session = self
.session_manager
.get_or_create_session(
@@ -348,8 +389,8 @@
return Ok(route);
}
}
- self.topic_route_inner(self.get_session().await.unwrap(), topic)
- .await
+ let rpc_client = self.get_session().await?;
+ self.topic_route_inner(rpc_client, topic).await
}
async fn query_topic_route<T: RPCClient + 'static>(
@@ -461,15 +502,16 @@
async fn heart_beat_inner<T: RPCClient + 'static>(
mut rpc_client: T,
- group: &str,
+ group: &Option<String>,
namespace: &str,
client_type: &ClientType,
) -> Result<HeartbeatResponse, ClientError> {
+ let group = group.as_ref().map(|group| Resource {
+ name: group.to_string(),
+ resource_namespace: namespace.to_string(),
+ });
let request = HeartbeatRequest {
- group: Some(Resource {
- name: group.to_string(),
- resource_namespace: namespace.to_string(),
- }),
+ group,
client_type: client_type.clone() as i32,
};
let response = rpc_client.heartbeat(request).await?;
@@ -534,7 +576,7 @@
) -> Result<Vec<Message>, ClientError> {
let request = ReceiveMessageRequest {
group: Some(Resource {
- name: self.option.group.to_string(),
+ name: self.option.group.as_ref().unwrap().to_string(),
resource_namespace: self.option.namespace.to_string(),
}),
message_queue: Some(message_queue),
@@ -594,7 +636,7 @@
) -> Result<Vec<AckMessageResultEntry>, ClientError> {
let request = AckMessageRequest {
group: Some(Resource {
- name: self.option.group.to_string(),
+ name: self.option.group.as_ref().unwrap().to_string(),
resource_namespace: self.option.namespace.to_string(),
}),
topic: Some(Resource {
@@ -642,7 +684,10 @@
fn new_client_for_test() -> Client {
Client {
logger: terminal_logger(),
- option: ClientOption::default(),
+ option: ClientOption {
+ group: Some("group".to_string()),
+ ..Default::default()
+ },
session_manager: Arc::new(SessionManager::default()),
route_table: Mutex::new(HashMap::new()),
id: Client::generate_client_id(),
@@ -650,6 +695,7 @@
settings: TelemetryCommand::default(),
transaction_checker: None,
telemetry_command_tx: None,
+ shutdown_tx: None,
}
}
@@ -665,6 +711,7 @@
settings: TelemetryCommand::default(),
transaction_checker: None,
telemetry_command_tx: Some(tx),
+ shutdown_tx: None,
}
}
@@ -714,7 +761,8 @@
.expect_get_or_create_session()
.returning(|_, _, _| Ok(Session::mock()));
- let client = new_client_with_session_manager(session_manager);
+ let mut client = new_client_with_session_manager(session_manager);
+ let _ = client.start().await;
let result = client.get_session().await;
assert!(result.is_ok());
let result = client
@@ -893,7 +941,9 @@
mock.expect_heartbeat()
.return_once(|_| Box::pin(futures::future::ready(response)));
- let send_result = Client::heart_beat_inner(mock, "", "", &ClientType::Producer).await;
+ let send_result =
+ Client::heart_beat_inner(mock, &Some("group".to_string()), "", &ClientType::Producer)
+ .await;
assert!(send_result.is_ok());
}
@@ -1034,7 +1084,7 @@
let result = Client::handle_telemetry_command(
mock,
&Some(Box::new(|_, _| TransactionResolution::COMMIT)),
- Endpoints::from_url("localhopst:8081").unwrap(),
+ Endpoints::from_url("localhost:8081").unwrap(),
RecoverOrphanedTransactionCommand(pb::RecoverOrphanedTransactionCommand {
message: Some(Message {
topic: Some(Resource::default()),
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index ea27d27..fa16a41 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -29,7 +29,7 @@
#[derive(Debug, Clone)]
pub struct ClientOption {
pub(crate) client_type: ClientType,
- pub(crate) group: String,
+ pub(crate) group: Option<String>,
pub(crate) namespace: String,
pub(crate) access_url: String,
pub(crate) enable_tls: bool,
@@ -43,7 +43,7 @@
fn default() -> Self {
ClientOption {
client_type: ClientType::Producer,
- group: "".to_string(),
+ group: None,
namespace: "".to_string(),
access_url: "localhost:8081".to_string(),
enable_tls: false,
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index d5e768c..7e3f399 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -275,8 +275,9 @@
}
let topic = message.take_topic();
let receipt = self.send(message).await?;
+ let rpc_client = self.client.get_session().await?;
Ok(TransactionImpl::new(
- Box::new(self.client.get_session().await.unwrap()),
+ Box::new(rpc_client),
Resource {
resource_namespace: self.option.namespace().to_string(),
name: topic,
@@ -284,6 +285,10 @@
receipt,
))
}
+
+ pub async fn shutdown(self) -> Result<(), ClientError> {
+ self.client.shutdown().await
+ }
}
#[cfg(test)]
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 5762d66..9441f7c 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -22,7 +22,7 @@
use slog::{debug, error, info, o, Logger};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
-use tokio::sync::{mpsc, Mutex};
+use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::metadata::{AsciiMetadataValue, MetadataMap};
@@ -34,9 +34,9 @@
use crate::pb::telemetry_command::Command;
use crate::pb::{
AckMessageRequest, AckMessageResponse, EndTransactionRequest, EndTransactionResponse,
- HeartbeatRequest, HeartbeatResponse, QueryRouteRequest, QueryRouteResponse,
- ReceiveMessageRequest, ReceiveMessageResponse, SendMessageRequest, SendMessageResponse,
- TelemetryCommand,
+ HeartbeatRequest, HeartbeatResponse, NotifyClientTerminationRequest,
+ NotifyClientTerminationResponse, QueryRouteRequest, QueryRouteResponse, ReceiveMessageRequest,
+ ReceiveMessageResponse, SendMessageRequest, SendMessageResponse, TelemetryCommand,
};
use crate::util::{PROTOCOL_VERSION, SDK_LANGUAGE, SDK_VERSION};
use crate::{error::ClientError, pb::messaging_service_client::MessagingServiceClient};
@@ -50,6 +50,7 @@
const OPERATION_RECEIVE_MESSAGE: &str = "rpc.receive_message";
const OPERATION_ACK_MESSAGE: &str = "rpc.ack_message";
const OPERATION_END_TRANSACTION: &str = "rpc.end_transaction";
+const OPERATION_NOTIFY_CLIENT_TERMINATION: &str = "rpc.notify_client_termination";
#[async_trait]
#[automock]
@@ -78,17 +79,36 @@
&mut self,
request: EndTransactionRequest,
) -> Result<EndTransactionResponse, ClientError>;
+ async fn notify_shutdown(
+ &mut self,
+ request: NotifyClientTerminationRequest,
+ ) -> Result<NotifyClientTerminationResponse, ClientError>;
}
#[allow(dead_code)]
-#[derive(Debug, Clone)]
+#[derive(Debug)]
pub(crate) struct Session {
logger: Logger,
client_id: String,
option: ClientOption,
endpoints: Endpoints,
stub: MessagingServiceClient<Channel>,
- telemetry_tx: Box<Option<mpsc::Sender<TelemetryCommand>>>,
+ telemetry_tx: Option<mpsc::Sender<TelemetryCommand>>,
+ shutdown_tx: Option<oneshot::Sender<()>>,
+}
+
+impl Clone for Session {
+ fn clone(&self) -> Self {
+ Session {
+ logger: self.logger.clone(),
+ client_id: self.client_id.clone(),
+ option: self.option.clone(),
+ endpoints: self.endpoints.clone(),
+ stub: self.stub.clone(),
+ telemetry_tx: None,
+ shutdown_tx: None,
+ }
+ }
}
impl Session {
@@ -108,7 +128,8 @@
stub: MessagingServiceClient::new(
Channel::from_static("http://localhost:8081").connect_lazy(),
),
- telemetry_tx: Box::new(None),
+ telemetry_tx: None,
+ shutdown_tx: None,
}
}
@@ -159,7 +180,8 @@
endpoints: endpoints.clone(),
client_id,
stub,
- telemetry_tx: Box::new(None),
+ telemetry_tx: None,
+ shutdown_tx: None,
})
}
@@ -288,24 +310,35 @@
.set_source(e)
})?;
+ let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+ self.shutdown_tx = Some(shutdown_tx);
+
let logger = self.logger.clone();
tokio::spawn(async move {
let mut stream = response.into_inner();
loop {
- match stream.message().await {
- Ok(Some(item)) => {
- debug!(logger, "receive telemetry command: {:?}", item);
- if let Some(command) = item.command {
- _ = telemetry_command_tx.send(command).await;
+ tokio::select! {
+ message = stream.message() => {
+ match message {
+ Ok(Some(item)) => {
+ debug!(logger, "receive telemetry command: {:?}", item);
+ if let Some(command) = item.command {
+ _ = telemetry_command_tx.send(command).await;
+ }
+ }
+ Ok(None) => {
+ info!(logger, "telemetry command stream closed by server");
+ break;
+ }
+ Err(e) => {
+ error!(logger, "telemetry response error: {:?}", e);
+ }
}
}
- Ok(None) => {
- debug!(logger, "request stream closed");
+ _ = &mut shutdown_rx => {
+ info!(logger, "receive shutdown signal, stop dealing with telemetry command");
break;
}
- Err(e) => {
- error!(logger, "telemetry response error: {:?}", e);
- }
}
}
});
@@ -314,9 +347,15 @@
Ok(())
}
+ pub(crate) fn shutdown(&mut self) {
+ if let Some(tx) = self.shutdown_tx.take() {
+ let _ = tx.send(());
+ }
+ }
+
#[allow(dead_code)]
pub(crate) fn is_started(&self) -> bool {
- self.telemetry_tx.is_some()
+ self.shutdown_tx.is_some()
}
#[allow(dead_code)]
@@ -465,6 +504,26 @@
})?;
Ok(response.into_inner())
}
+
+ async fn notify_shutdown(
+ &mut self,
+ request: NotifyClientTerminationRequest,
+ ) -> Result<NotifyClientTerminationResponse, ClientError> {
+ let request = self.sign(request);
+ let response = self
+ .stub
+ .notify_client_termination(request)
+ .await
+ .map_err(|e| {
+ ClientError::new(
+ ErrorKind::ClientInternal,
+ "send rpc notify_client_termination failed",
+ OPERATION_NOTIFY_CLIENT_TERMINATION,
+ )
+ .set_source(e)
+ })?;
+ Ok(response.into_inner())
+ }
}
#[derive(Debug)]
@@ -521,6 +580,14 @@
}
Ok(sessions)
}
+
+ pub(crate) async fn shutdown(&self) {
+ let mut session_map = self.session_map.lock().await;
+ for (_, session) in session_map.iter_mut() {
+ session.shutdown();
+ }
+ session_map.clear();
+ }
}
#[cfg(test)]
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index d4e222e..a877705 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -67,7 +67,7 @@
let client_option = ClientOption {
client_type: ClientType::SimpleConsumer,
- group: option.consumer_group().to_string(),
+ group: Some(option.consumer_group().to_string()),
namespace: option.namespace().to_string(),
..client_option
};
@@ -104,6 +104,10 @@
Ok(())
}
+ pub async fn shutdown(self) -> Result<(), ClientError> {
+ self.client.shutdown().await
+ }
+
/// receive messages from the specified topic
///
/// # Arguments
@@ -115,7 +119,7 @@
topic: impl AsRef<str>,
expression: &FilterExpression,
) -> Result<Vec<MessageView>, ClientError> {
- self.receive_with_batch_size(topic.as_ref(), expression, 32, Duration::from_secs(15))
+ self.receive_with(topic.as_ref(), expression, 32, Duration::from_secs(15))
.await
}
@@ -127,7 +131,7 @@
/// * `expression` - the subscription for the topic
/// * `batch_size` - max message num of server returned
/// * `invisible_duration` - set the invisible duration of messages that return from the server, these messages will not be visible to other consumers unless timeout
- pub async fn receive_with_batch_size(
+ pub async fn receive_with(
&self,
topic: impl AsRef<str>,
expression: &FilterExpression,