[rust] check route equality and keep existing route in case failure to query route from remote (#710)
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 4183256..f3bcb28 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -397,10 +397,17 @@
"query route for topic={} success: route={:?}", topic, route
);
let route = Arc::new(route);
- let prev = self
- .route_table
- .lock()
- .insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
+ let mut route_table_lock = self.route_table.lock();
+
+ // if message queues in previous and new route are the same, just keep the previous.
+ if let Some(RouteStatus::Found(prev)) = route_table_lock.get(topic) {
+ if prev.queue == route.queue {
+ return Ok(Arc::clone(prev));
+ }
+ }
+
+ let prev =
+ route_table_lock.insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
info!(self.logger, "update route for topic={}", topic);
if let Some(RouteStatus::Querying(Some(mut v))) = prev {
@@ -415,7 +422,12 @@
self.logger,
"query route for topic={} failed: error={}", topic, err
);
- let prev = self.route_table.lock().remove(topic);
+ let mut route_table_lock = self.route_table.lock();
+ // keep the existing route if error occurs.
+ if let Some(RouteStatus::Found(prev)) = route_table_lock.get(topic) {
+ return Ok(Arc::clone(prev));
+ }
+ let prev = route_table_lock.remove(topic);
if let Some(RouteStatus::Querying(Some(mut v))) = prev {
for item in v.drain(..) {
let _ = item.send(Err(ClientError::new(
@@ -926,6 +938,72 @@
}
#[tokio::test]
+ async fn client_query_existing_route_with_failed_request() {
+ let client = new_client_for_test();
+ let message_queues = if let Ok(QueryRouteResponse {
+ status: _,
+ message_queues,
+ }) = new_topic_route_response()
+ {
+ message_queues
+ } else {
+ vec![]
+ };
+ client.route_table.lock().insert(
+ "DefaultCluster".to_string(),
+ RouteStatus::Found(Arc::new(Route {
+ index: AtomicUsize::new(0),
+ queue: message_queues,
+ })),
+ );
+
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_query_route().return_once(|_| {
+ sleep(Duration::from_millis(200));
+ Box::pin(futures::future::ready(Err(ClientError::new(
+ ErrorKind::Server,
+ "server error",
+ "test",
+ ))))
+ });
+
+ let result = client.topic_route_inner(mock, "DefaultCluster").await;
+ assert!(result.is_ok());
+ }
+
+ #[tokio::test]
+ async fn client_update_same_route() {
+ let client = new_client_for_test();
+
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_query_route()
+ .return_once(|_| Box::pin(futures::future::ready(new_topic_route_response())));
+
+ let result = client.topic_route_inner(mock, "DefaultCluster").await;
+ assert!(result.is_ok());
+
+ let route = result.unwrap();
+ assert!(!route.queue.is_empty());
+ route.index.fetch_add(1, Ordering::Relaxed);
+
+ let topic = &route.queue[0].topic;
+ assert!(topic.is_some());
+
+ let topic = topic.clone().unwrap();
+ assert_eq!(topic.name, "DefaultCluster");
+ assert_eq!(topic.resource_namespace, "default");
+
+ mock = session::MockRPCClient::new();
+ mock.expect_query_route()
+ .return_once(|_| Box::pin(futures::future::ready(new_topic_route_response())));
+
+ let result2 = client.topic_route_inner(mock, "DefaultCluster").await;
+ assert!(result2.is_ok());
+
+ let route2 = result2.unwrap();
+ assert_eq!(1, route2.index.load(Ordering::Relaxed));
+ }
+ #[tokio::test]
async fn client_heartbeat() {
let response = Ok(HeartbeatResponse {
status: Some(Status {