| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| //! Transaction data model of RocketMQ rust client. |
| |
| use std::fmt::{Debug, Formatter}; |
| |
| use async_trait::async_trait; |
| |
| use crate::client::Client; |
| use crate::error::ClientError; |
| use crate::model::common::SendReceipt; |
| use crate::model::message::MessageView; |
| use crate::pb::{EndTransactionRequest, Resource, TransactionSource}; |
| use crate::session::RPCClient; |
| |
| /// An entity to describe an independent transaction. |
| /// |
| /// Once the request of commit of roll-back reached server, subsequently arrived commit or roll-back request in |
| /// [`Transaction`] would be ignored by the server. |
| /// |
| /// If transaction is not commit/roll-back in time, it is suspended until it is solved by [`TransactionChecker`] |
| /// or reach the end of life. |
| #[async_trait] |
| pub trait Transaction { |
| /// Try to commit the transaction, which would expose the message before the transaction is closed if no exception thrown. |
| /// What you should pay more attention to is that the commitment may be successful even exception is thrown. |
| async fn commit(self) -> Result<(), ClientError>; |
| |
| /// Try to roll back the transaction, which would expose the message before the transaction is closed if no exception thrown. |
| /// What you should pay more attention to is that the roll-back may be successful even exception is thrown. |
| async fn rollback(self) -> Result<(), ClientError>; |
| |
| /// Get message id |
| fn message_id(&self) -> &str; |
| |
| /// Get transaction id |
| fn transaction_id(&self) -> &str; |
| } |
| |
| pub(crate) struct TransactionImpl { |
| rpc_client: Box<dyn RPCClient + Send + Sync>, |
| topic: Resource, |
| send_receipt: SendReceipt, |
| } |
| |
| impl Debug for TransactionImpl { |
| fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { |
| f.debug_struct("TransactionImpl") |
| .field("transaction_id", &self.send_receipt.transaction_id()) |
| .field("message_id", &self.send_receipt.message_id()) |
| .finish() |
| } |
| } |
| |
| impl TransactionImpl { |
| pub(crate) fn new( |
| rpc_client: Box<dyn RPCClient + Send + Sync>, |
| topic: Resource, |
| send_receipt: SendReceipt, |
| ) -> TransactionImpl { |
| TransactionImpl { |
| rpc_client, |
| topic, |
| send_receipt, |
| } |
| } |
| |
| async fn end_transaction( |
| mut self, |
| resolution: TransactionResolution, |
| ) -> Result<(), ClientError> { |
| let response = self |
| .rpc_client |
| .end_transaction(EndTransactionRequest { |
| topic: Some(self.topic), |
| message_id: self.send_receipt.message_id().to_string(), |
| transaction_id: self.send_receipt.transaction_id().to_string(), |
| resolution: resolution as i32, |
| source: TransactionSource::SourceClient as i32, |
| trace_context: "".to_string(), |
| }) |
| .await?; |
| Client::handle_response_status(response.status, "end transaction") |
| } |
| } |
| |
| #[async_trait] |
| impl Transaction for TransactionImpl { |
| async fn commit(mut self) -> Result<(), ClientError> { |
| return self.end_transaction(TransactionResolution::COMMIT).await; |
| } |
| |
| async fn rollback(mut self) -> Result<(), ClientError> { |
| return self.end_transaction(TransactionResolution::ROLLBACK).await; |
| } |
| |
| fn message_id(&self) -> &str { |
| self.send_receipt.message_id() |
| } |
| |
| fn transaction_id(&self) -> &str { |
| self.send_receipt.transaction_id() |
| } |
| } |
| |
| /// Resolution of Transaction. |
| #[repr(i32)] |
| pub enum TransactionResolution { |
| /// Notify server that current transaction should be committed. |
| COMMIT = 1, |
| /// Notify server that current transaction should be roll-backed. |
| ROLLBACK = 2, |
| /// Notify server that the state of this transaction is not sure. You should be cautious before return unknown |
| /// because the examination from the server will be performed periodically. |
| UNKNOWN = 0, |
| } |
| |
| /// A closure to check the state of transaction. |
| /// RocketMQ Server will call producer periodically to check the state of uncommitted transaction. |
| /// |
| /// # Arguments |
| /// |
| /// * transaction id |
| /// * message |
| pub type TransactionChecker = dyn Fn(String, MessageView) -> TransactionResolution + Send + Sync; |
| |
| #[cfg(test)] |
| mod tests { |
| use crate::error::ClientError; |
| use crate::model::common::SendReceipt; |
| use crate::model::transaction::{Transaction, TransactionImpl}; |
| use crate::pb::{Code, EndTransactionResponse, Resource, SendResultEntry, Status}; |
| use crate::session; |
| |
| #[tokio::test] |
| async fn transaction_commit() -> Result<(), ClientError> { |
| let mut mock = session::MockRPCClient::new(); |
| mock.expect_end_transaction().return_once(|_| { |
| Box::pin(futures::future::ready(Ok(EndTransactionResponse { |
| status: Some(Status { |
| code: Code::Ok as i32, |
| message: "".to_string(), |
| }), |
| }))) |
| }); |
| let transaction = TransactionImpl::new( |
| Box::new(mock), |
| Resource { |
| resource_namespace: "".to_string(), |
| name: "".to_string(), |
| }, |
| SendReceipt::from_pb_send_result(&SendResultEntry { |
| status: None, |
| message_id: "".to_string(), |
| transaction_id: "".to_string(), |
| offset: 0, |
| }), |
| ); |
| transaction.commit().await |
| } |
| |
| #[tokio::test] |
| async fn transaction_rollback() -> Result<(), ClientError> { |
| let mut mock = session::MockRPCClient::new(); |
| mock.expect_end_transaction().return_once(|_| { |
| Box::pin(futures::future::ready(Ok(EndTransactionResponse { |
| status: Some(Status { |
| code: Code::Ok as i32, |
| message: "".to_string(), |
| }), |
| }))) |
| }); |
| let transaction = TransactionImpl::new( |
| Box::new(mock), |
| Resource { |
| resource_namespace: "".to_string(), |
| name: "".to_string(), |
| }, |
| SendReceipt::from_pb_send_result(&SendResultEntry { |
| status: None, |
| message_id: "".to_string(), |
| transaction_id: "".to_string(), |
| offset: 0, |
| }), |
| ); |
| transaction.rollback().await |
| } |
| } |