Merge pull request #674 from apache/java_release_5.0.6
Java release 5.0.6
diff --git a/.asf.yaml b/.asf.yaml
index 4f5661e..1401283 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -33,7 +33,7 @@
# Enable squash button
squash: true
# Disable merge button
- merge: false
+ merge: true
# Disable rebase button
rebase: false
protected_branches:
diff --git a/golang/client.go b/golang/client.go
index 45e4b54..71c4819 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -24,7 +24,6 @@
"encoding/hex"
"errors"
"fmt"
- "reflect"
"sync"
"time"
@@ -32,6 +31,7 @@
"github.com/apache/rocketmq-clients/golang/v5/pkg/ticker"
"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+ "github.com/golang/protobuf/proto"
"github.com/google/uuid"
"go.uber.org/atomic"
"go.uber.org/zap"
@@ -504,27 +504,40 @@
f := func() {
cli.router.Range(func(k, v interface{}) bool {
topic := k.(string)
- oldRoute := v
newRoute, err := cli.queryRoute(context.TODO(), topic, cli.opts.timeout)
if err != nil {
cli.log.Errorf("scheduled queryRoute err=%v", err)
}
- if newRoute == nil && oldRoute != nil {
+ if newRoute == nil && v != nil {
cli.log.Info("newRoute is nil, but oldRoute is not. do not update")
return true
}
- if !reflect.DeepEqual(newRoute, oldRoute) {
+ var oldRoute []*v2.MessageQueue
+ if v != nil {
+ oldRoute = v.([]*v2.MessageQueue)
+ }
+ if !routeEqual(oldRoute, newRoute) {
cli.router.Store(k, newRoute)
switch impl := cli.clientImpl.(type) {
case *defaultProducer:
- plb, err := NewPublishingLoadBalancer(newRoute)
- if err == nil {
- impl.publishingRouteDataResultCache.Store(topic, plb)
+ existing, ok := impl.publishingRouteDataResultCache.Load(topic)
+ if !ok {
+ plb, err := NewPublishingLoadBalancer(newRoute)
+ if err == nil {
+ impl.publishingRouteDataResultCache.Store(topic, plb)
+ }
+ } else {
+ impl.publishingRouteDataResultCache.Store(topic, existing.(PublishingLoadBalancer).CopyAndUpdate(newRoute))
}
case *defaultSimpleConsumer:
- slb, err := NewSubscriptionLoadBalancer(newRoute)
- if err == nil {
- impl.subTopicRouteDataResultCache.Store(topic, slb)
+ existing, ok := impl.subTopicRouteDataResultCache.Load(topic)
+ if !ok {
+ slb, err := NewSubscriptionLoadBalancer(newRoute)
+ if err == nil {
+ impl.subTopicRouteDataResultCache.Store(topic, slb)
+ }
+ } else {
+ impl.subTopicRouteDataResultCache.Store(topic, existing.(SubscriptionLoadBalancer).CopyAndUpdate(newRoute))
}
}
}
@@ -534,6 +547,19 @@
ticker.Tick(f, time.Second*30, cli.done)
return nil
}
+
+func routeEqual(old, new []*v2.MessageQueue) bool {
+ if len(old) != len(new) {
+ return false
+ }
+ for i := 0; i < len(old); i++ {
+ if !proto.Equal(old[i], new[i]) {
+ return false
+ }
+ }
+ return true
+}
+
func (cli *defaultClient) notifyClientTermination() {
cli.log.Info("start notifyClientTermination")
ctx := cli.Sign(context.Background())
diff --git a/golang/client_test.go b/golang/client_test.go
index b46d2d7..4549bdf 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -20,6 +20,7 @@
import (
"context"
"fmt"
+ "reflect"
"testing"
"time"
@@ -293,3 +294,48 @@
assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
assert.Equal(t, "Failed to recover, err=EOF", commandExecutionLog[1].Message)
}
+
+func Test_routeEqual(t *testing.T) {
+ oldMq := &v2.MessageQueue{
+ Topic: &v2.Resource{
+ Name: "topic-test",
+ },
+ Id: 0,
+ Permission: v2.Permission_READ_WRITE,
+ Broker: &v2.Broker{
+ Name: "broker-test",
+ Id: 0,
+ Endpoints: fakeEndpoints(),
+ },
+ AcceptMessageTypes: []v2.MessageType{
+ v2.MessageType_NORMAL,
+ },
+ }
+ newMq := &v2.MessageQueue{
+ Topic: &v2.Resource{
+ Name: "topic-test",
+ },
+ Id: 0,
+ Permission: v2.Permission_READ_WRITE,
+ Broker: &v2.Broker{
+ Name: "broker-test",
+ Id: 0,
+ Endpoints: fakeEndpoints(),
+ },
+ AcceptMessageTypes: []v2.MessageType{
+ v2.MessageType_NORMAL,
+ },
+ }
+
+ newMq.ProtoReflect() // message internal field value will be changed
+
+ oldRoute := []*v2.MessageQueue{oldMq}
+ newRoute := []*v2.MessageQueue{newMq}
+
+ assert.Equal(t, false, reflect.DeepEqual(oldRoute, newRoute))
+ assert.Equal(t, true, routeEqual(oldRoute, newRoute))
+ assert.Equal(t, true, routeEqual(nil, nil))
+ assert.Equal(t, false, routeEqual(nil, newRoute))
+ assert.Equal(t, false, routeEqual(oldRoute, nil))
+ assert.Equal(t, true, routeEqual(nil, []*v2.MessageQueue{}))
+}
diff --git a/golang/loadBalancer.go b/golang/loadBalancer.go
index db9cbd4..da2dd64 100644
--- a/golang/loadBalancer.go
+++ b/golang/loadBalancer.go
@@ -31,6 +31,7 @@
type PublishingLoadBalancer interface {
TakeMessageQueueByMessageGroup(messageGroup *string) ([]*v2.MessageQueue, error)
TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue, error)
+ CopyAndUpdate([]*v2.MessageQueue) PublishingLoadBalancer
}
type publishingLoadBalancer struct {
@@ -119,8 +120,16 @@
return candidates, nil
}
+func (plb *publishingLoadBalancer) CopyAndUpdate(messageQueues []*v2.MessageQueue) PublishingLoadBalancer {
+ return &publishingLoadBalancer{
+ messageQueues: messageQueues,
+ index: plb.index,
+ }
+}
+
type SubscriptionLoadBalancer interface {
TakeMessageQueue() (*v2.MessageQueue, error)
+ CopyAndUpdate([]*v2.MessageQueue) SubscriptionLoadBalancer
}
type subscriptionLoadBalancer struct {
@@ -147,3 +156,10 @@
selectMessageQueue := slb.messageQueues[idx]
return selectMessageQueue, nil
}
+
+func (slb *subscriptionLoadBalancer) CopyAndUpdate(messageQueues []*v2.MessageQueue) SubscriptionLoadBalancer {
+ return &subscriptionLoadBalancer{
+ messageQueues: messageQueues,
+ index: slb.index,
+ }
+}
diff --git a/nodejs/src/client/BaseClient.ts b/nodejs/src/client/BaseClient.ts
index ad67e08..9fa12da 100644
--- a/nodejs/src/client/BaseClient.ts
+++ b/nodejs/src/client/BaseClient.ts
@@ -137,7 +137,7 @@
// heartbeat every 10s
this.#timers.push(setInterval(async () => {
this.#doHeartbeat();
- }, 5 * 60000));
+ }, 10000));
// doStats every 60s
// doStats()
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index da47ed2..bc88827 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -58,7 +58,7 @@
byteorder = "1"
mac_address = "1.1.4"
hex = "0.4.3"
-time = "0.3"
+time = { version = "0.3", features = ["local-offset"] }
once_cell = "1.18.0"
mockall = "0.11.4"
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 91bd369..884f98b 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -31,16 +31,14 @@
use crate::conf::ClientOption;
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, SendReceipt};
-use crate::model::message::{AckMessageEntry, MessageView};
-use crate::model::transaction::{TransactionChecker, TransactionResolution};
+use crate::model::message::AckMessageEntry;
use crate::pb;
use crate::pb::receive_message_response::Content;
-use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, Settings};
use crate::pb::{
AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest, Code,
- EndTransactionRequest, FilterExpression, HeartbeatRequest, HeartbeatResponse, Message,
- MessageQueue, NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest,
- Resource, SendMessageRequest, Status, TelemetryCommand, TransactionSource,
+ FilterExpression, HeartbeatRequest, HeartbeatResponse, Message, MessageQueue,
+ NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest, Resource,
+ SendMessageRequest, Status, TelemetryCommand,
};
#[double]
use crate::session::SessionManager;
@@ -54,7 +52,6 @@
id: String,
access_endpoints: Endpoints,
settings: TelemetryCommand,
- transaction_checker: Option<Box<TransactionChecker>>,
telemetry_command_tx: Option<mpsc::Sender<pb::telemetry_command::Command>>,
shutdown_tx: Option<oneshot::Sender<()>>,
}
@@ -70,8 +67,6 @@
const OPERATION_SEND_MESSAGE: &str = "client.send_message";
const OPERATION_RECEIVE_MESSAGE: &str = "client.receive_message";
const OPERATION_ACK_MESSAGE: &str = "client.ack_message";
-const OPERATION_END_TRANSACTION: &str = "client.end_transaction";
-const OPERATION_HANDLE_TELEMETRY_COMMAND: &str = "client.handle_telemetry_command";
impl Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -102,28 +97,23 @@
id,
access_endpoints: endpoints,
settings,
- transaction_checker: None,
telemetry_command_tx: None,
shutdown_tx: None,
})
}
+ pub(crate) fn get_endpoints(&self) -> Endpoints {
+ self.access_endpoints.clone()
+ }
+
pub(crate) fn is_started(&self) -> bool {
self.shutdown_tx.is_some()
}
- pub(crate) fn has_transaction_checker(&self) -> bool {
- self.transaction_checker.is_some()
- }
-
- pub(crate) fn set_transaction_checker(&mut self, transaction_checker: Box<TransactionChecker>) {
- if self.is_started() {
- panic!("client {} is started, can not be modified", self.id)
- }
- self.transaction_checker = Some(transaction_checker);
- }
-
- pub(crate) async fn start(&mut self) -> Result<(), ClientError> {
+ pub(crate) async fn start(
+ &mut self,
+ telemetry_command_tx: mpsc::Sender<pb::telemetry_command::Command>,
+ ) -> Result<(), ClientError> {
let logger = self.logger.clone();
let session_manager = self.session_manager.clone();
@@ -134,19 +124,12 @@
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
self.shutdown_tx = Some(shutdown_tx);
- // send heartbeat and handle telemetry command
- let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16);
self.telemetry_command_tx = Some(telemetry_command_tx);
+
let rpc_client = self
.get_session()
.await
.map_err(|error| error.with_operation(OPERATION_CLIENT_START))?;
- let endpoints = self.access_endpoints.clone();
- let transaction_checker = self.transaction_checker.take();
- // give a placeholder
- if transaction_checker.is_some() {
- self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN));
- }
tokio::spawn(async move {
rpc_client.is_started();
@@ -188,24 +171,13 @@
debug!(logger,"send heartbeat to server success, peer={}",peer);
}
},
- command = telemetry_command_rx.recv() => {
- if let Some(command) = command {
- let result = Self::handle_telemetry_command(rpc_client.shadow_session(), &transaction_checker, endpoints.clone(), command).await;
- if let Err(error) = result {
- error!(logger, "handle telemetry command failed: {:?}", error);
- }
- }
- },
_ = &mut shutdown_rx => {
- info!(logger, "receive shutdown signal, stop heartbeat task and telemetry command handler");
+ info!(logger, "receive shutdown signal, stop heartbeat task.");
break;
}
}
}
- info!(
- logger,
- "heartbeat task and telemetry command handler are stopped"
- );
+ info!(logger, "heartbeat task is stopped");
});
Ok(())
}
@@ -239,58 +211,6 @@
Ok(())
}
- async fn handle_telemetry_command<T: RPCClient + 'static>(
- mut rpc_client: T,
- transaction_checker: &Option<Box<TransactionChecker>>,
- endpoints: Endpoints,
- command: pb::telemetry_command::Command,
- ) -> Result<(), ClientError> {
- return match command {
- RecoverOrphanedTransactionCommand(command) => {
- let transaction_id = command.transaction_id;
- let message = command.message.unwrap();
- let message_id = message
- .system_properties
- .as_ref()
- .unwrap()
- .message_id
- .clone();
- let topic = message.topic.as_ref().unwrap().clone();
- if let Some(transaction_checker) = transaction_checker {
- let resolution = transaction_checker(
- transaction_id.clone(),
- MessageView::from_pb_message(message, endpoints),
- );
-
- let response = rpc_client
- .end_transaction(EndTransactionRequest {
- topic: Some(topic),
- message_id: message_id.to_string(),
- transaction_id,
- resolution: resolution as i32,
- source: TransactionSource::SourceServerCheck as i32,
- trace_context: "".to_string(),
- })
- .await?;
- Self::handle_response_status(response.status, OPERATION_END_TRANSACTION)
- } else {
- Err(ClientError::new(
- ErrorKind::Config,
- "failed to get transaction checker",
- OPERATION_END_TRANSACTION,
- ))
- }
- }
- Settings(_) => Ok(()),
- _ => Err(ClientError::new(
- ErrorKind::Config,
- "receive telemetry command but there is no handler",
- OPERATION_HANDLE_TELEMETRY_COMMAND,
- )
- .with_context("command", format!("{:?}", command))),
- };
- }
-
pub(crate) fn client_id(&self) -> &str {
&self.id
}
@@ -704,13 +624,11 @@
use crate::error::{ClientError, ErrorKind};
use crate::log::terminal_logger;
use crate::model::common::{ClientType, Route};
- use crate::model::transaction::TransactionResolution;
use crate::pb::receive_message_response::Content;
use crate::pb::{
AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse, Code,
- EndTransactionResponse, FilterExpression, HeartbeatResponse, Message, MessageQueue,
- QueryRouteResponse, ReceiveMessageResponse, Resource, SendMessageResponse, Status,
- SystemProperties, TelemetryCommand,
+ FilterExpression, HeartbeatResponse, Message, MessageQueue, QueryRouteResponse,
+ ReceiveMessageResponse, Resource, SendMessageResponse, Status, TelemetryCommand,
};
use crate::session;
@@ -731,7 +649,6 @@
id: Client::generate_client_id(),
access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(),
settings: TelemetryCommand::default(),
- transaction_checker: None,
telemetry_command_tx: None,
shutdown_tx: None,
}
@@ -747,7 +664,6 @@
id: Client::generate_client_id(),
access_endpoints: Endpoints::from_url("http://localhost:8081").unwrap(),
settings: TelemetryCommand::default(),
- transaction_checker: None,
telemetry_command_tx: Some(tx),
shutdown_tx: None,
}
@@ -784,7 +700,8 @@
.returning(|_, _, _| Ok(Session::mock()));
let mut client = new_client_with_session_manager(session_manager);
- client.start().await?;
+ let (tx, _) = mpsc::channel(16);
+ client.start(tx).await?;
// TODO use countdown latch instead sleeping
// wait for run
@@ -800,7 +717,8 @@
.returning(|_, _, _| Ok(Session::mock()));
let mut client = new_client_with_session_manager(session_manager);
- let _ = client.start().await;
+ let (tx, _rx) = mpsc::channel(16);
+ let _ = client.start(tx).await;
let result = client.get_session().await;
assert!(result.is_ok());
let result = client
@@ -1134,33 +1052,4 @@
assert_eq!(error.message, "server return an error");
assert_eq!(error.operation, "client.ack_message");
}
-
- #[tokio::test]
- async fn client_handle_telemetry_command() {
- let response = Ok(EndTransactionResponse {
- status: Some(Status {
- code: Code::Ok as i32,
- message: "".to_string(),
- }),
- });
- let mut mock = session::MockRPCClient::new();
- mock.expect_end_transaction()
- .return_once(|_| Box::pin(futures::future::ready(response)));
- let result = Client::handle_telemetry_command(
- mock,
- &Some(Box::new(|_, _| TransactionResolution::COMMIT)),
- Endpoints::from_url("localhost:8081").unwrap(),
- RecoverOrphanedTransactionCommand(pb::RecoverOrphanedTransactionCommand {
- message: Some(Message {
- topic: Some(Resource::default()),
- user_properties: Default::default(),
- system_properties: Some(SystemProperties::default()),
- body: vec![],
- }),
- transaction_id: "".to_string(),
- }),
- )
- .await;
- assert!(result.is_ok())
- }
}
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 59d1eeb..5210842 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -33,6 +33,9 @@
#[error("Message is invalid")]
InvalidMessage,
+ #[error("Message type not match with topic accept message type")]
+ MessageTypeNotMatch,
+
#[error("Server error")]
Server,
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index 28c232e..1defcd8 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -21,9 +21,18 @@
use crate::error::{ClientError, ErrorKind};
use crate::model::common::Endpoints;
+use crate::model::message::MessageType::{DELAY, FIFO, NORMAL, TRANSACTION};
use crate::model::message_id::UNIQ_ID_GENERATOR;
use crate::pb;
+#[derive(Clone, Copy, Debug)]
+pub enum MessageType {
+ NORMAL = 1,
+ FIFO = 2,
+ DELAY = 3,
+ TRANSACTION = 4,
+}
+
/// [`Message`] is the data model for sending.
pub trait Message {
fn take_message_id(&mut self) -> String;
@@ -35,6 +44,17 @@
fn take_message_group(&mut self) -> Option<String>;
fn take_delivery_timestamp(&mut self) -> Option<i64>;
fn transaction_enabled(&mut self) -> bool;
+ fn get_message_type(&self) -> MessageType;
+}
+
+pub trait MessageTypeAware {
+ fn accept_type(&self, message_type: MessageType) -> bool;
+}
+
+impl MessageTypeAware for pb::MessageQueue {
+ fn accept_type(&self, message_type: MessageType) -> bool {
+ self.accept_message_types.contains(&(message_type as i32))
+ }
}
pub(crate) struct MessageImpl {
@@ -47,6 +67,7 @@
pub(crate) message_group: Option<String>,
pub(crate) delivery_timestamp: Option<i64>,
pub(crate) transaction_enabled: bool,
+ pub(crate) message_type: MessageType,
}
impl Message for MessageImpl {
@@ -85,6 +106,10 @@
fn transaction_enabled(&mut self) -> bool {
self.transaction_enabled
}
+
+ fn get_message_type(&self) -> MessageType {
+ self.message_type
+ }
}
/// [`MessageBuilder`] is the builder for [`Message`].
@@ -108,6 +133,7 @@
message_group: None,
delivery_timestamp: None,
transaction_enabled: false,
+ message_type: NORMAL,
},
}
}
@@ -135,6 +161,7 @@
message_group: Some(message_group.into()),
delivery_timestamp: None,
transaction_enabled: false,
+ message_type: FIFO,
},
}
}
@@ -162,6 +189,7 @@
message_group: None,
delivery_timestamp: Some(delay_time),
transaction_enabled: false,
+ message_type: DELAY,
},
}
}
@@ -184,6 +212,7 @@
message_group: None,
delivery_timestamp: None,
transaction_enabled: true,
+ message_type: TRANSACTION,
},
}
}
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 7e3f399..e456cbe 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -15,20 +15,30 @@
* limitations under the License.
*/
+use std::fmt::Debug;
+use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use mockall_double::double;
use prost_types::Timestamp;
-use slog::{info, Logger};
+use slog::{error, info, warn, Logger};
+use tokio::select;
+use tokio::sync::RwLock;
+use tokio::sync::{mpsc, oneshot};
#[double]
use crate::client::Client;
use crate::conf::{ClientOption, ProducerOption};
use crate::error::{ClientError, ErrorKind};
-use crate::model::common::{ClientType, SendReceipt};
-use crate::model::message;
-use crate::model::transaction::{Transaction, TransactionChecker, TransactionImpl};
-use crate::pb::{Encoding, MessageType, Resource, SystemProperties};
+use crate::model::common::{ClientType, Endpoints, SendReceipt};
+use crate::model::message::{self, MessageTypeAware, MessageView};
+use crate::model::transaction::{
+ Transaction, TransactionChecker, TransactionImpl, TransactionResolution,
+};
+use crate::pb::settings::PubSub;
+use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, Settings};
+use crate::pb::{Encoding, EndTransactionRequest, Resource, SystemProperties, TransactionSource};
+use crate::session::RPCClient;
use crate::util::{
build_endpoints_by_message_queue, build_producer_settings, select_message_queue,
select_message_queue_by_message_group, HOST_NAME,
@@ -41,17 +51,27 @@
/// Most of its methods take shared reference so that application developers may use it at will.
///
/// [`Producer`] is `Send` and `Sync` by design, so that developers may get started easily.
-#[derive(Debug)]
pub struct Producer {
- option: ProducerOption,
+ option: Arc<RwLock<ProducerOption>>,
logger: Logger,
client: Client,
+ transaction_checker: Option<Box<TransactionChecker>>,
+ shutdown_tx: Option<oneshot::Sender<()>>,
+}
+
+impl Debug for Producer {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Producer")
+ .field("option", &self.option)
+ .field("client", &self.client)
+ .finish()
+ }
}
impl Producer {
const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
const OPERATION_SEND_TRANSACTION_MESSAGE: &'static str = "producer.send_transaction_message";
-
+ const OPERATION_END_TRANSACTION: &'static str = "producer.end_transaction";
/// Create a new producer instance
///
/// # Arguments
@@ -67,10 +87,13 @@
let logger = log::logger(option.logging_format());
let settings = build_producer_settings(&option, &client_option);
let client = Client::new(&logger, client_option, settings)?;
+ let option = Arc::new(RwLock::new(option));
Ok(Producer {
option,
logger,
client,
+ transaction_checker: None,
+ shutdown_tx: None,
})
}
@@ -93,23 +116,80 @@
};
let logger = log::logger(option.logging_format());
let settings = build_producer_settings(&option, &client_option);
- let mut client = Client::new(&logger, client_option, settings)?;
- client.set_transaction_checker(transaction_checker);
+ let client = Client::new(&logger, client_option, settings)?;
+ let option = Arc::new(RwLock::new(option));
Ok(Producer {
option,
logger,
client,
+ transaction_checker: Some(transaction_checker),
+ shutdown_tx: None,
})
}
+ async fn get_resource_namespace(&self) -> String {
+ let option_guard = self.option.read();
+ let resource_namespace = option_guard.await.namespace().to_string();
+ resource_namespace
+ }
+
/// Start the producer
pub async fn start(&mut self) -> Result<(), ClientError> {
- self.client.start().await?;
- if let Some(topics) = self.option.topics() {
+ let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16);
+ let telemetry_command_tx: mpsc::Sender<pb::telemetry_command::Command> =
+ telemetry_command_tx;
+ self.client.start(telemetry_command_tx).await?;
+ let option_guard = self.option.read().await;
+ let topics = option_guard.topics();
+ if let Some(topics) = topics {
for topic in topics {
self.client.topic_route(topic, true).await?;
}
}
+ drop(option_guard);
+ let transaction_checker = self.transaction_checker.take();
+ if transaction_checker.is_some() {
+ self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN));
+ }
+ let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+ self.shutdown_tx = Some(shutdown_tx);
+ let rpc_client = self.client.get_session().await?;
+ let endpoints = self.client.get_endpoints();
+ let logger = self.logger.clone();
+ let producer_option = Arc::clone(&self.option);
+ tokio::spawn(async move {
+ loop {
+ select! {
+ command = telemetry_command_rx.recv() => {
+ if let Some(command) = command {
+ match command {
+ RecoverOrphanedTransactionCommand(command) => {
+ let result = Self::handle_recover_orphaned_transaction_command(
+ rpc_client.shadow_session(),
+ command,
+ &transaction_checker,
+ endpoints.clone()).await;
+ if let Err(error) = result {
+ error!(logger, "handle trannsaction command failed: {:?}", error);
+ };
+ }
+ Settings(command) => {
+ let option = &mut producer_option.write().await;
+ Self::handle_settings_command(command, option);
+ info!(logger, "handle setting command success.");
+ }
+ _ => {
+ warn!(logger, "unimplemented command {:?}", command);
+ }
+ }
+ }
+ }
+ _ = &mut shutdown_rx => {
+ break;
+ }
+ }
+ }
+ });
info!(
self.logger,
"start producer success, client_id: {}",
@@ -118,7 +198,70 @@
Ok(())
}
- fn transform_messages_to_protobuf(
+ async fn handle_recover_orphaned_transaction_command<T: RPCClient + 'static>(
+ mut rpc_client: T,
+ command: pb::RecoverOrphanedTransactionCommand,
+ transaction_checker: &Option<Box<TransactionChecker>>,
+ endpoints: Endpoints,
+ ) -> Result<(), ClientError> {
+ let transaction_id = command.transaction_id;
+ let message = command.message.clone().ok_or_else(|| {
+ ClientError::new(
+ ErrorKind::InvalidMessage,
+ "no message in command",
+ Self::OPERATION_END_TRANSACTION,
+ )
+ })?;
+ let message_id = message
+ .system_properties
+ .as_ref()
+ .map(|props| props.message_id.clone())
+ .ok_or_else(|| {
+ ClientError::new(
+ ErrorKind::InvalidMessage,
+ "no message id exists",
+ Self::OPERATION_END_TRANSACTION,
+ )
+ })?;
+ let topic = message.topic.clone().ok_or_else(|| {
+ ClientError::new(
+ ErrorKind::InvalidMessage,
+ "no topic exists in message",
+ Self::OPERATION_END_TRANSACTION,
+ )
+ })?;
+ if let Some(transaction_checker) = transaction_checker {
+ let resolution = transaction_checker(
+ transaction_id.clone(),
+ MessageView::from_pb_message(message, endpoints),
+ );
+ let response = rpc_client
+ .end_transaction(EndTransactionRequest {
+ topic: Some(topic),
+ message_id,
+ transaction_id,
+ resolution: resolution as i32,
+ source: TransactionSource::SourceServerCheck as i32,
+ trace_context: "".to_string(),
+ })
+ .await?;
+ Client::handle_response_status(response.status, Self::OPERATION_END_TRANSACTION)
+ } else {
+ Err(ClientError::new(
+ ErrorKind::Config,
+ "failed to get transaction checker",
+ Self::OPERATION_END_TRANSACTION,
+ ))
+ }
+ }
+
+ fn handle_settings_command(settings: pb::Settings, option: &mut ProducerOption) {
+ if let Some(PubSub::Publishing(publishing)) = settings.pub_sub {
+ option.set_validate_message_type(publishing.validate_message_type);
+ };
+ }
+
+ async fn transform_messages_to_protobuf(
&self,
messages: Vec<impl message::Message>,
) -> Result<(String, Option<String>, Vec<pb::Message>), ClientError> {
@@ -172,24 +315,20 @@
.take_delivery_timestamp()
.map(|seconds| Timestamp { seconds, nanos: 0 });
- let message_type = if message.transaction_enabled() {
+ if message.transaction_enabled() {
message_group = None;
delivery_timestamp = None;
- MessageType::Transaction as i32
} else if delivery_timestamp.is_some() {
message_group = None;
- MessageType::Delay as i32
} else if message_group.is_some() {
delivery_timestamp = None;
- MessageType::Fifo as i32
- } else {
- MessageType::Normal as i32
};
+ // TODO: use a converter trait From or TryFrom
let pb_message = pb::Message {
topic: Some(Resource {
name: message.take_topic(),
- resource_namespace: self.option.namespace().to_string(),
+ resource_namespace: self.get_resource_namespace().await,
}),
user_properties: message.take_properties(),
system_properties: Some(SystemProperties {
@@ -198,7 +337,7 @@
message_id: message.take_message_id(),
message_group,
delivery_timestamp,
- message_type,
+ message_type: message.get_message_type() as i32,
born_host: HOST_NAME.clone(),
born_timestamp: born_timestamp.clone(),
body_digest: None,
@@ -241,8 +380,13 @@
&self,
messages: Vec<impl message::Message>,
) -> Result<Vec<SendReceipt>, ClientError> {
+ let message_types = messages
+ .iter()
+ .map(|message| message.get_message_type())
+ .collect::<Vec<_>>();
+
let (topic, message_group, mut pb_messages) =
- self.transform_messages_to_protobuf(messages)?;
+ self.transform_messages_to_protobuf(messages).await?;
let route = self.client.topic_route(&topic, true).await?;
@@ -252,6 +396,25 @@
select_message_queue(route)
};
+ let option_guard = self.option.read().await;
+ let validate_message_type = option_guard.validate_message_type();
+ drop(option_guard);
+ if validate_message_type {
+ for message_type in message_types {
+ if !message_queue.accept_type(message_type) {
+ return Err(ClientError::new(
+ ErrorKind::MessageTypeNotMatch,
+ format!(
+ "Current message type {:?} not match with accepted types {:?}.",
+ message_type, message_queue.accept_message_types
+ )
+ .as_str(),
+ Self::OPERATION_SEND_MESSAGE,
+ ));
+ }
+ }
+ }
+
let endpoints =
build_endpoints_by_message_queue(&message_queue, Self::OPERATION_SEND_MESSAGE)?;
for message in pb_messages.iter_mut() {
@@ -261,12 +424,16 @@
self.client.send_message(&endpoints, pb_messages).await
}
+ pub fn has_transaction_checker(&self) -> bool {
+ self.transaction_checker.is_some()
+ }
+
/// Send message in a transaction
pub async fn send_transaction_message(
&self,
mut message: impl message::Message,
) -> Result<impl Transaction, ClientError> {
- if !self.client.has_transaction_checker() {
+ if !self.has_transaction_checker() {
return Err(ClientError::new(
ErrorKind::InvalidMessage,
"this producer can not send transaction message, please create a transaction producer using producer::new_transaction_producer",
@@ -279,14 +446,17 @@
Ok(TransactionImpl::new(
Box::new(rpc_client),
Resource {
- resource_namespace: self.option.namespace().to_string(),
+ resource_namespace: self.get_resource_namespace().await,
name: topic,
},
receipt,
))
}
- pub async fn shutdown(self) -> Result<(), ClientError> {
+ pub async fn shutdown(mut self) -> Result<(), ClientError> {
+ if let Some(tx) = self.shutdown_tx.take() {
+ let _ = tx.send(());
+ }
self.client.shutdown().await
}
}
@@ -295,13 +465,14 @@
mod tests {
use std::sync::Arc;
+ use crate::client::MockClient;
use crate::error::ErrorKind;
use crate::log::terminal_logger;
use crate::model::common::Route;
- use crate::model::message::{MessageBuilder, MessageImpl};
+ use crate::model::message::{MessageBuilder, MessageImpl, MessageType};
use crate::model::transaction::TransactionResolution;
- use crate::pb::{Broker, MessageQueue};
- use crate::session::Session;
+ use crate::pb::{Broker, Code, EndTransactionResponse, MessageQueue, Status};
+ use crate::session::{self, Session};
use super::*;
@@ -310,6 +481,18 @@
option: Default::default(),
logger: terminal_logger(),
client: Client::default(),
+ shutdown_tx: None,
+ transaction_checker: None,
+ }
+ }
+
+ fn new_transaction_producer_for_test() -> Producer {
+ Producer {
+ option: Default::default(),
+ logger: terminal_logger(),
+ client: Client::default(),
+ shutdown_tx: None,
+ transaction_checker: Some(Box::new(|_, _| TransactionResolution::COMMIT)),
}
}
@@ -326,10 +509,16 @@
queue: vec![],
}))
});
- client.expect_start().returning(|| Ok(()));
+ client.expect_start().returning(|_| Ok(()));
client
.expect_client_id()
.return_const("fake_id".to_string());
+ client
+ .expect_get_session()
+ .return_once(|| Ok(Session::mock()));
+ client
+ .expect_get_endpoints()
+ .return_once(|| Endpoints::from_url("foobar.com:8080").unwrap());
Ok(client)
});
let mut producer_option = ProducerOption::default();
@@ -353,11 +542,16 @@
queue: vec![],
}))
});
- client.expect_start().returning(|| Ok(()));
- client.expect_set_transaction_checker().returning(|_| ());
+ client.expect_start().returning(|_| Ok(()));
client
.expect_client_id()
.return_const("fake_id".to_string());
+ client
+ .expect_get_session()
+ .return_once(|| Ok(Session::mock()));
+ client
+ .expect_get_endpoints()
+ .return_once(|| Endpoints::from_url("foobar.com:8080").unwrap());
Ok(client)
});
let mut producer_option = ProducerOption::default();
@@ -384,7 +578,7 @@
.set_message_group("message_group".to_string())
.build()
.unwrap()];
- let result = producer.transform_messages_to_protobuf(messages);
+ let result = producer.transform_messages_to_protobuf(messages).await;
assert!(result.is_ok());
let (topic, message_group, pb_messages) = result.unwrap();
@@ -408,7 +602,7 @@
let producer = new_producer_for_test();
let messages: Vec<MessageImpl> = vec![];
- let result = producer.transform_messages_to_protobuf(messages);
+ let result = producer.transform_messages_to_protobuf(messages).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
@@ -424,8 +618,9 @@
message_group: None,
delivery_timestamp: None,
transaction_enabled: false,
+ message_type: MessageType::TRANSACTION,
}];
- let result = producer.transform_messages_to_protobuf(messages);
+ let result = producer.transform_messages_to_protobuf(messages).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
@@ -443,7 +638,7 @@
.build()
.unwrap(),
];
- let result = producer.transform_messages_to_protobuf(messages);
+ let result = producer.transform_messages_to_protobuf(messages).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
@@ -463,7 +658,7 @@
.build()
.unwrap(),
];
- let result = producer.transform_messages_to_protobuf(messages);
+ let result = producer.transform_messages_to_protobuf(messages).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind, ErrorKind::InvalidMessage);
@@ -491,7 +686,7 @@
addresses: vec![],
}),
}),
- accept_message_types: vec![],
+ accept_message_types: vec![MessageType::NORMAL as i32],
}],
}))
});
@@ -520,7 +715,7 @@
#[tokio::test]
async fn producer_send_transaction_message() -> Result<(), ClientError> {
- let mut producer = new_producer_for_test();
+ let mut producer = new_transaction_producer_for_test();
producer.client.expect_topic_route().returning(|_, _| {
Ok(Arc::new(Route {
index: Default::default(),
@@ -539,7 +734,7 @@
addresses: vec![],
}),
}),
- accept_message_types: vec![],
+ accept_message_types: vec![MessageType::TRANSACTION as i32],
}],
}))
});
@@ -556,20 +751,46 @@
.client
.expect_get_session()
.return_once(|| Ok(Session::mock()));
- producer
- .client
- .expect_has_transaction_checker()
- .return_once(|| true);
let _ = producer
.send_transaction_message(
- MessageBuilder::builder()
- .set_topic("test_topic")
- .set_body(vec![])
+ MessageBuilder::transaction_message_builder("test_topic", vec![])
.build()
.unwrap(),
)
.await?;
Ok(())
}
+
+ #[tokio::test]
+ async fn client_handle_recover_orphaned_transaction_command() {
+ let response = Ok(EndTransactionResponse {
+ status: Some(Status {
+ code: Code::Ok as i32,
+ message: "".to_string(),
+ }),
+ });
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_end_transaction()
+ .return_once(|_| Box::pin(futures::future::ready(response)));
+
+ let context = MockClient::handle_response_status_context();
+ context.expect().return_once(|_, _| Result::Ok(()));
+ let result = Producer::handle_recover_orphaned_transaction_command(
+ mock,
+ pb::RecoverOrphanedTransactionCommand {
+ message: Some(pb::Message {
+ topic: Some(Resource::default()),
+ user_properties: Default::default(),
+ system_properties: Some(SystemProperties::default()),
+ body: vec![],
+ }),
+ transaction_id: "".to_string(),
+ },
+ &Some(Box::new(|_, _| TransactionResolution::COMMIT)),
+ Endpoints::from_url("localhost:8081").unwrap(),
+ )
+ .await;
+ assert!(result.is_ok())
+ }
}
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index f8a6eac..e891d38 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -18,7 +18,9 @@
use std::time::Duration;
use mockall_double::double;
-use slog::{info, Logger};
+use slog::{info, warn, Logger};
+use tokio::select;
+use tokio::sync::{mpsc, oneshot};
#[double]
use crate::client::Client;
@@ -45,6 +47,7 @@
option: SimpleConsumerOption,
logger: Logger,
client: Client,
+ shutdown_tx: Option<oneshot::Sender<()>>,
}
impl SimpleConsumer {
@@ -78,6 +81,7 @@
option,
logger,
client,
+ shutdown_tx: None,
})
}
@@ -90,12 +94,29 @@
Self::OPERATION_START_SIMPLE_CONSUMER,
));
}
- self.client.start().await?;
+ let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16);
+ self.client.start(telemetry_command_tx).await?;
if let Some(topics) = self.option.topics() {
for topic in topics {
self.client.topic_route(topic, true).await?;
}
}
+ let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+ self.shutdown_tx = Some(shutdown_tx);
+ let logger = self.logger.clone();
+ tokio::spawn(async move {
+ loop {
+ select! {
+ command = telemetry_command_rx.recv() => {
+ warn!(logger, "command {:?} cannot be handled in simple consumer.", command);
+ }
+
+ _ = &mut shutdown_rx => {
+ break;
+ }
+ }
+ }
+ });
info!(
self.logger,
"start simple consumer success, client_id: {}",
@@ -105,6 +126,9 @@
}
pub async fn shutdown(self) -> Result<(), ClientError> {
+ if let Some(shutdown_tx) = self.shutdown_tx {
+ let _ = shutdown_tx.send(());
+ };
self.client.shutdown().await
}
@@ -215,7 +239,7 @@
queue: vec![],
}))
});
- client.expect_start().returning(|| Ok(()));
+ client.expect_start().returning(|_| Ok(()));
client
.expect_client_id()
.return_const("fake_id".to_string());
@@ -272,6 +296,7 @@
option: SimpleConsumerOption::default(),
logger: terminal_logger(),
client,
+ shutdown_tx: None,
};
let messages = simple_consumer