IGNITE-14204 Fix C++ thin transactions
This closes #8836
diff --git a/modules/platforms/cpp/thin-client-test/src/tx_test.cpp b/modules/platforms/cpp/thin-client-test/src/tx_test.cpp
index 19757a2..04422ad 100644
--- a/modules/platforms/cpp/thin-client-test/src/tx_test.cpp
+++ b/modules/platforms/cpp/thin-client-test/src/tx_test.cpp
@@ -39,7 +39,8 @@
public:
IgniteTxTestSuiteFixture()
{
- serverNode = ignite_test::StartCrossPlatformServerNode("cache.xml", "ServerNode");
+ node1 = ignite_test::StartCrossPlatformServerNode("cache.xml", "node1");
+ node2 = ignite_test::StartCrossPlatformServerNode("cache.xml", "node2");
}
~IgniteTxTestSuiteFixture()
@@ -47,9 +48,24 @@
ignite::Ignition::StopAll(false);
}
+ /**
+ * Start client.
+ */
+ static IgniteClient StartClient()
+ {
+ IgniteClientConfiguration cfg;
+
+ cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111");
+
+ return IgniteClient::Start(cfg);
+ }
+
private:
- /** Server node. */
- ignite::Ignite serverNode;
+ /** Server node #1. */
+ ignite::Ignite node1;
+
+ /** Server node #2. */
+ ignite::Ignite node2;
};
BOOST_FIXTURE_TEST_SUITE(IgniteTxTestSuite, IgniteTxTestSuiteFixture)
@@ -75,11 +91,7 @@
BOOST_AUTO_TEST_CASE(TestCacheOpsWithTx)
{
- IgniteClientConfiguration cfg;
-
- cfg.SetEndPoints("127.0.0.1:11110");
-
- IgniteClient client = IgniteClient::Start(cfg);
+ IgniteClient client = StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
@@ -190,7 +202,7 @@
tx.Rollback();
- BOOST_CHECK_EQUAL(cache.GetSize(cache::CachePeekMode::ALL), 1);
+ BOOST_CHECK_EQUAL(cache.GetSize(cache::CachePeekMode::PRIMARY), 1);
//---
@@ -241,11 +253,7 @@
void startAnotherClientAndTx(SharedPointer<SingleLatch>& l)
{
- IgniteClientConfiguration cfg;
-
- cfg.SetEndPoints("127.0.0.1:11110");
-
- IgniteClient client = IgniteClient::Start(cfg);
+ IgniteClient client = IgniteTxTestSuiteFixture::StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
@@ -263,11 +271,7 @@
BOOST_AUTO_TEST_CASE(TestTxOps)
{
- IgniteClientConfiguration cfg;
-
- cfg.SetEndPoints("127.0.0.1:11110");
-
- IgniteClient client = IgniteClient::Start(cfg);
+ IgniteClient client = StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
@@ -347,11 +351,7 @@
BOOST_AUTO_TEST_CASE(TestTxWithLabel)
{
- IgniteClientConfiguration cfg;
-
- cfg.SetEndPoints("127.0.0.1:11110");
-
- IgniteClient client = IgniteClient::Start(cfg);
+ IgniteClient client = StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
@@ -397,4 +397,41 @@
tx.Close();
}
+BOOST_AUTO_TEST_CASE(ManyTransactions)
+{
+ IgniteClient client = StartClient();
+
+ cache::CacheClient<int, int> cache =
+ client.GetCache<int, int>("partitioned");
+
+ transactions::ClientTransactions transactions = client.ClientTransactions();
+ const int32_t key = 42;
+
+ for (int32_t val = 0; val < 100; ++val) {
+ transactions::ClientTransaction tx = transactions.TxStart();
+
+ cache.Put(key, val);
+
+ tx.Commit();
+
+ BOOST_CHECK_EQUAL(val, cache.Get(key));
+ }
+
+ const int32_t expected = -42;
+
+ cache.Put(key, expected);
+
+ BOOST_CHECK_EQUAL(expected, cache.Get(key));
+
+ for (int32_t val = 0; val < 100; ++val) {
+ transactions::ClientTransaction tx = transactions.TxStart();
+
+ cache.Put(key, val);
+
+ tx.Rollback();
+
+ BOOST_CHECK_EQUAL(expected, cache.Get(key));
+ }
+}
+
BOOST_AUTO_TEST_SUITE_END()
diff --git a/modules/platforms/cpp/thin-client/CMakeLists.txt b/modules/platforms/cpp/thin-client/CMakeLists.txt
index 059b012..bc1fbeb 100644
--- a/modules/platforms/cpp/thin-client/CMakeLists.txt
+++ b/modules/platforms/cpp/thin-client/CMakeLists.txt
@@ -34,6 +34,7 @@
src/impl/message.cpp
src/impl/cache/cache_client_proxy.cpp
src/impl/cache/cache_client_impl.cpp
+ src/impl/transactions/transaction_impl.cpp
src/impl/transactions/transactions_impl.cpp
src/impl/transactions/transactions_proxy.cpp
src/ignite_client.cpp
diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
index 4d60182..d5cdc6f 100644
--- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
+++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj
@@ -169,6 +169,7 @@
<ClCompile Include="..\..\src\impl\protocol_version.cpp" />
<ClCompile Include="..\..\src\impl\remote_type_updater.cpp" />
<ClCompile Include="..\..\src\impl\utility.cpp" />
+ <ClCompile Include="..\..\src\impl\transactions\transaction_impl.cpp" />
<ClCompile Include="..\..\src\impl\transactions\transactions_impl.cpp" />
<ClCompile Include="..\..\src\impl\transactions\transactions_proxy.cpp" />
</ItemGroup>
diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
index 263325c..c9a092a 100644
--- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
+++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters
@@ -67,6 +67,15 @@
<ClCompile Include="..\..\src\impl\affinity\affinity_manager.cpp">
<Filter>Code\impl\affinity</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\impl\transactions\transaction_impl.cpp">
+ <Filter>Code\impl\transactions</Filter>
+ </ClCompile>
+ <ClCompile Include="..\..\src\impl\transactions\transactions_impl.cpp">
+ <Filter>Code\impl\transactions</Filter>
+ </ClCompile>
+ <ClCompile Include="..\..\src\impl\transactions\transactions_proxy.cpp">
+ <Filter>Code\impl\transactions</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\ignite\thin\ignite_client.h">
@@ -174,5 +183,20 @@
<ClInclude Include="..\..\src\impl\affinity\partition_awareness_group.h">
<Filter>Code\impl\affinity</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\ignite\thin\transactions\transactions.h">
+ <Filter>Code\transactions</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\thin\transactions\transaction.h">
+ <Filter>Code\transactions</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\thin\transactions\transaction_consts.h">
+ <Filter>Code\transactions</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\include\ignite\thin\transactions\transactions_proxy.h">
+ <Filter>Code\transactions</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\src\impl\transactions\transactions_impl.h">
+ <Filter>Code\impl\transactions</Filter>
+ </ClInclude>
</ItemGroup>
</Project>
\ No newline at end of file
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
index 51b0c28..d66b9e6 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp
@@ -101,71 +101,79 @@
return channel;
}
- template<typename ReqT>
- void CacheClientImpl::checkTransactional(ReqT& req)
+ template<typename ReqT, typename RspT>
+ void CacheClientImpl::TransactionalSyncCacheKeyMessage(const WritableKey &key, ReqT &req,
+ RspT &rsp)
{
- SP_TransactionImpl activeTx = tx.Get()->GetCurrent();
+ if (!TryProcessTransactional(req, rsp))
+ SyncCacheKeyMessage(key, req, rsp);
+ }
- bool isUnderTx = activeTx.IsValid();
+ template<typename ReqT, typename RspT>
+ void CacheClientImpl::TransactionalSyncMessage(ReqT &req, RspT &rsp)
+ {
+ if (!TryProcessTransactional(req, rsp))
+ SyncMessage(req, rsp);
+ }
- int32_t txId = isUnderTx ? activeTx.Get()->TxId() : 0;
+ template<typename ReqT, typename RspT>
+ bool CacheClientImpl::TryProcessTransactional(ReqT& req, RspT& rsp)
+ {
+ TransactionImpl* activeTx = tx.Get()->GetCurrent().Get();
- req.activeTx(isUnderTx, txId);
+ if (!activeTx)
+ return false;
+
+ req.activeTx(true, activeTx->TxId());
+
+ SP_DataChannel channel = activeTx->GetChannel();
+
+ channel.Get()->SyncMessage(req, rsp, router.Get()->GetIoTimeout());
+
+ if (rsp.GetStatus() != ResponseStatus::SUCCESS)
+ throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str());
+
+ return true;
}
void CacheClientImpl::Put(const WritableKey& key, const Writable& value)
{
Cache2ValueRequest<RequestType::CACHE_PUT> req(id, binary, key, value);
-
- checkTransactional(req);
-
Response rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
void CacheClientImpl::Get(const WritableKey& key, Readable& value)
{
CacheValueRequest<RequestType::CACHE_GET> req(id, binary, key);
-
- checkTransactional(req);
-
CacheValueResponse rsp(value);
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
void CacheClientImpl::PutAll(const Writable & pairs)
{
CacheValueRequest<RequestType::CACHE_PUT_ALL> req(id, binary, pairs);
-
- checkTransactional(req);
-
Response rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
}
void CacheClientImpl::GetAll(const Writable& keys, Readable& pairs)
{
CacheValueRequest<RequestType::CACHE_GET_ALL> req(id, binary, keys);
-
- checkTransactional(req);
-
CacheValueResponse rsp(pairs);
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
}
bool CacheClientImpl::Replace(const WritableKey& key, const Writable& value)
{
Cache2ValueRequest<RequestType::CACHE_REPLACE> req(id, binary, key, value);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
return rsp.GetValue();
}
@@ -173,12 +181,9 @@
bool CacheClientImpl::ContainsKey(const WritableKey& key)
{
CacheValueRequest<RequestType::CACHE_CONTAINS_KEY> req(id, binary, key);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
return rsp.GetValue();
}
@@ -186,12 +191,9 @@
bool CacheClientImpl::ContainsKeys(const Writable& keys)
{
CacheValueRequest<RequestType::CACHE_CONTAINS_KEYS> req(id, binary, keys);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
return rsp.GetValue();
}
@@ -199,12 +201,9 @@
int64_t CacheClientImpl::GetSize(int32_t peekModes)
{
CacheGetSizeRequest req(id, binary, peekModes);
-
- checkTransactional(req);
-
Int64Response rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
return rsp.GetValue();
}
@@ -212,12 +211,9 @@
bool CacheClientImpl::Remove(const WritableKey& key)
{
CacheValueRequest<RequestType::CACHE_REMOVE_KEY> req(id, binary, key);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
return rsp.GetValue();
}
@@ -225,12 +221,9 @@
bool CacheClientImpl::Remove(const WritableKey& key, const Writable& val)
{
Cache2ValueRequest<RequestType::CACHE_REMOVE_IF_EQUALS> req(id, binary, key, val);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
return rsp.GetValue();
}
@@ -238,78 +231,57 @@
void CacheClientImpl::RemoveAll(const Writable& keys)
{
CacheValueRequest<RequestType::CACHE_REMOVE_KEYS> req(id, binary, keys);
-
- checkTransactional(req);
-
Response rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
}
void CacheClientImpl::RemoveAll()
{
CacheRequest<RequestType::CACHE_REMOVE_ALL> req(id, binary);
-
- checkTransactional(req);
-
Response rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
}
void CacheClientImpl::Clear(const WritableKey& key)
{
CacheValueRequest<RequestType::CACHE_CLEAR_KEY> req(id, binary, key);
-
- checkTransactional(req);
-
Response rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
void CacheClientImpl::Clear()
{
CacheRequest<RequestType::CACHE_CLEAR> req(id, binary);
-
- checkTransactional(req);
-
Response rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
}
void CacheClientImpl::ClearAll(const Writable& keys)
{
CacheValueRequest<RequestType::CACHE_CLEAR_KEYS> req(id, binary, keys);
-
- checkTransactional(req);
-
Response rsp;
- SyncMessage(req, rsp);
+ TransactionalSyncMessage(req, rsp);
}
void CacheClientImpl::LocalPeek(const WritableKey& key, Readable& value)
{
CacheValueRequest<RequestType::CACHE_LOCAL_PEEK> req(id, binary, key);
-
- checkTransactional(req);
-
CacheValueResponse rsp(value);
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
bool CacheClientImpl::Replace(const WritableKey& key, const Writable& oldVal, const Writable& newVal)
{
Cache3ValueRequest<RequestType::CACHE_REPLACE_IF_EQUALS> req(id, binary, key, oldVal, newVal);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
return rsp.GetValue();
}
@@ -317,45 +289,33 @@
void CacheClientImpl::GetAndPut(const WritableKey& key, const Writable& valIn, Readable& valOut)
{
Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT> req(id, binary, key, valIn);
-
- checkTransactional(req);
-
CacheValueResponse rsp(valOut);
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
void CacheClientImpl::GetAndRemove(const WritableKey& key, Readable& valOut)
{
CacheValueRequest<RequestType::CACHE_GET_AND_REMOVE> req(id, binary, key);
-
- checkTransactional(req);
-
CacheValueResponse rsp(valOut);
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
void CacheClientImpl::GetAndReplace(const WritableKey& key, const Writable& valIn, Readable& valOut)
{
Cache2ValueRequest<RequestType::CACHE_GET_AND_REPLACE> req(id, binary, key, valIn);
-
- checkTransactional(req);
-
CacheValueResponse rsp(valOut);
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
bool CacheClientImpl::PutIfAbsent(const WritableKey& key, const Writable& val)
{
Cache2ValueRequest<RequestType::CACHE_PUT_IF_ABSENT> req(id, binary, key, val);
-
- checkTransactional(req);
-
BoolResponse rsp;
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
return rsp.GetValue();
}
@@ -363,12 +323,9 @@
void CacheClientImpl::GetAndPutIfAbsent(const WritableKey& key, const Writable& valIn, Readable& valOut)
{
Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT_IF_ABSENT> req(id, binary, key, valIn);
-
- checkTransactional(req);
-
CacheValueResponse rsp(valOut);
- SyncCacheKeyMessage(key, req, rsp);
+ TransactionalSyncCacheKeyMessage(key, req, rsp);
}
query::SP_QueryFieldsCursorImpl CacheClientImpl::Query(
diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
index d74ad29..4efefd7 100644
--- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h
@@ -310,9 +310,6 @@
template<typename ReqT, typename RspT>
void SyncCacheKeyMessage(const WritableKey& key, const ReqT& req, RspT& rsp);
- template<typename ReqT>
- void checkTransactional(ReqT& req);
-
/**
* Synchronously send message and receive response.
*
@@ -324,6 +321,40 @@
template<typename ReqT, typename RspT>
SP_DataChannel SyncMessage(const ReqT& req, RspT& rsp);
+ /**
+ * Synchronously send request message and receive response taking in account that it can be
+ * transactional.
+ *
+ * @param key Key.
+ * @param req Request message.
+ * @param rsp Response message.
+ * @throw IgniteError on error.
+ */
+ template<typename ReqT, typename RspT>
+ void TransactionalSyncCacheKeyMessage(const WritableKey& key, ReqT& req, RspT& rsp);
+
+ /**
+ * Synchronously send message and receive response taking in account that it can be transactional.
+ *
+ * @param req Request message.
+ * @param rsp Response message.
+ * @return Channel that was used for request.
+ * @throw IgniteError on error.
+ */
+ template<typename ReqT, typename RspT>
+ void TransactionalSyncMessage(ReqT& req, RspT& rsp);
+
+ /***
+ * Check whether request is transactional and process it if it is.
+ * @tparam ReqT Request type.
+ * @tparam RspT Response type.
+ * @param req Request.
+ * @param rsp Response.
+ * @return @c true if processed and false otherwise.
+ */
+ template<typename ReqT, typename RspT>
+ bool TryProcessTransactional(ReqT& req, RspT& rsp);
+
/** Data router. */
SP_DataRouter router;
diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.h b/modules/platforms/cpp/thin-client/src/impl/data_router.h
index baa74ac..701f710 100644
--- a/modules/platforms/cpp/thin-client/src/impl/data_router.h
+++ b/modules/platforms/cpp/thin-client/src/impl/data_router.h
@@ -208,6 +208,16 @@
*/
affinity::SP_AffinityAssignment GetAffinityAssignment(int32_t cacheId) const;
+ /**
+ * Get IO timeout.
+ *
+ * @return IO timeout.
+ */
+ int32_t GetIoTimeout()
+ {
+ return ioTimeout;
+ }
+
private:
IGNITE_NO_COPY_ASSIGNMENT(DataRouter);
diff --git a/modules/platforms/cpp/thin-client/src/impl/message.h b/modules/platforms/cpp/thin-client/src/impl/message.h
index 80e5437..a7220d8 100644
--- a/modules/platforms/cpp/thin-client/src/impl/message.h
+++ b/modules/platforms/cpp/thin-client/src/impl/message.h
@@ -363,7 +363,8 @@
CacheRequest(int32_t cacheId, bool binary) :
cacheId(cacheId),
binary(binary),
- actTx(false)
+ actTx(false),
+ txId(0)
{
// No-op.
}
@@ -536,15 +537,6 @@
}
/**
- * Sets transaction active flag and appropriate txId.
- * @param active Transaction activity flag.
- * @param id Transaction id.
- */
- void activeTx(bool active, int32_t id) {
- CacheRequest<OpCode>::activeTx(active, id);
- }
-
- /**
* Write request using provided writer.
* @param writer Writer.
* @param ver Version.
@@ -691,11 +683,11 @@
* Constructor.
*
* @param id Transaction id.
- * @param comm Need to commit flag.
+ * @param commit Need to commit flag.
*/
- TxEndRequest(int32_t id, bool comm) :
+ TxEndRequest(int32_t id, bool commit) :
txId(id),
- commited(comm)
+ commited(commit)
{
// No-op.
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.cpp
new file mode 100644
index 0000000..6fd9115
--- /dev/null
+++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.cpp
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "impl/message.h"
+#include "impl/transactions/transaction_impl.h"
+#include "impl/transactions/transactions_impl.h"
+#include "impl/response_status.h"
+
+using namespace ignite::common::concurrent;
+using namespace ignite::impl::thin;
+using namespace ignite::thin::transactions;
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace thin
+ {
+ namespace transactions
+ {
+ template<typename ReqT, typename RspT>
+ void TransactionImpl::SendTxMessage(const ReqT& req, RspT& rsp)
+ {
+ channel.Get()->SyncMessage(req, rsp, static_cast<int32_t>(timeout / 1000) + ioTimeout);
+
+ if (rsp.GetStatus() != ResponseStatus::SUCCESS)
+ throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str());
+ }
+
+ SP_TransactionImpl TransactionImpl::Create(
+ TransactionsImpl& txs,
+ SP_DataRouter& router,
+ TransactionConcurrency::Type concurrency,
+ TransactionIsolation::Type isolation,
+ int64_t timeout,
+ int32_t txSize,
+ SharedPointer<common::FixedSizeArray<char> > label)
+ {
+ SP_TransactionImpl tx = txs.GetCurrent();
+
+ TransactionImpl* ptr = tx.Get();
+
+ if (ptr && !ptr->IsClosed())
+ throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_STARTED);
+
+ TxStartRequest req(concurrency, isolation, timeout, label);
+
+ Int32Response rsp;
+
+ SP_DataChannel channel = router.Get()->SyncMessage(req, rsp);
+
+ if (rsp.GetStatus() != ResponseStatus::SUCCESS)
+ throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str());
+
+ int32_t curTxId = rsp.GetValue();
+
+ tx = SP_TransactionImpl(new TransactionImpl(txs, channel, curTxId, concurrency,
+ isolation, timeout, router.Get()->GetIoTimeout(), txSize));
+
+ txs.SetCurrent(tx);
+
+ return tx;
+ }
+
+ bool TransactionImpl::IsClosed() const
+ {
+ return closed;
+ }
+
+ void TransactionImpl::Commit()
+ {
+ ThreadCheck();
+
+ TxEndRequest req(txId, true);
+
+ Response rsp;
+
+ SendTxMessage(req, rsp);
+
+ ThreadEnd();
+ }
+
+ void TransactionImpl::Rollback()
+ {
+ ThreadCheck();
+
+ TxEndRequest req(txId, false);
+
+ Response rsp;
+
+ SendTxMessage(req, rsp);
+
+ ThreadEnd();
+ }
+
+ void TransactionImpl::Close()
+ {
+ ThreadCheck();
+
+ if (IsClosed())
+ {
+ return;
+ }
+
+ Rollback();
+
+ ThreadEnd();
+ }
+
+ void TransactionImpl::SetClosed()
+ {
+ closed = true;
+ }
+
+ void TransactionImpl::ThreadEnd()
+ {
+ this->SetClosed();
+
+ txs.ResetCurrent();
+ }
+
+ void TransactionImpl::ThreadCheck()
+ {
+ SP_TransactionImpl tx = txs.GetCurrent();
+
+ TransactionImpl* ptr = tx.Get();
+
+ if (!ptr)
+ throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_CLOSED);
+
+ if (ptr->TxId() != this->TxId())
+ throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_DIFFERENT_THREAD);
+ }
+ }
+ }
+ }
+}
diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h
index b6f9aa1..e395b3b 100644
--- a/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h
@@ -18,10 +18,10 @@
#ifndef _IGNITE_IMPL_THIN_TRANSACTION_IMPL
#define _IGNITE_IMPL_THIN_TRANSACTION_IMPL
-#include "impl/data_router.h"
#include <ignite/common/fixed_size_array.h>
-#include "ignite/thin/transactions/transaction_consts.h"
-#include "impl/transactions/transactions_impl.h"
+#include <ignite/thin/transactions/transaction_consts.h>
+
+#include "impl/data_router.h"
namespace ignite
{
@@ -51,24 +51,30 @@
* Constructor.
*
* @param txImpl Transactions implementation.
- * @param txid Transaction Id.
+ * @param channel Channel linked to transaction.
+ * @param txId Transaction Id.
* @param concurrency Transaction concurrency.
* @param isolation Transaction isolation.
* @param timeout Transaction timeout.
+ * @param ioTimeout IO timeout for channel.
* @param size Number of entries participating in transaction (may be approximate).
*/
TransactionImpl(
TransactionsImpl& txImpl,
- int32_t txid,
+ SP_DataChannel channel,
+ int32_t txId,
ignite::thin::transactions::TransactionConcurrency::Type concurrency,
ignite::thin::transactions::TransactionIsolation::Type isolation,
int64_t timeout,
+ int32_t ioTimeout,
int32_t size) :
+ channel(channel),
txs(txImpl),
- txId(txid),
+ txId(txId),
concurrency(concurrency),
isolation(isolation),
timeout(timeout),
+ ioTimeout(ioTimeout),
txSize(size),
closed(false)
{
@@ -123,6 +129,7 @@
* Starts transaction.
*
* @param txs Transactions implementation.
+ * @param router Router to use to start transaction.
* @param concurrency Transaction concurrency.
* @param isolation Transaction isolation.
* @param timeout Transaction timeout.
@@ -131,19 +138,43 @@
*/
static SP_TransactionImpl Create(
TransactionsImpl& txs,
+ SP_DataRouter& router,
ignite::thin::transactions::TransactionConcurrency::Type concurrency,
ignite::thin::transactions::TransactionIsolation::Type isolation,
int64_t timeout,
int32_t txSize,
ignite::common::concurrent::SharedPointer<common::FixedSizeArray<char> > label);
- protected:
+
+ /**
+ * Get channel for the transaction.
+ *
+ * @return Channel.
+ */
+ SP_DataChannel GetChannel()
+ {
+ return channel;
+ }
+
+ private:
/** Checks current thread state. */
void ThreadCheck();
/** Completes tc and clear state from storage. */
void ThreadEnd();
- private:
+ /**
+ * Synchronously send message and receive response.
+ *
+ * @param req Request message.
+ * @param rsp Response message.
+ * @throw IgniteError on error.
+ */
+ template<typename ReqT, typename RspT>
+ void SendTxMessage(const ReqT& req, RspT& rsp);
+
+ /** Data channel to use. */
+ SP_DataChannel channel;
+
/** Transactions implementation. */
TransactionsImpl& txs;
@@ -159,6 +190,9 @@
/** Timeout in milliseconds. */
int64_t timeout;
+ /** Channel io timeout. */
+ int32_t ioTimeout;
+
/** Transaction size. */
int32_t txSize;
diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
index d785932..6227add 100644
--- a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
+++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp
@@ -15,10 +15,7 @@
* limitations under the License.
*/
-#include "impl/message.h"
#include "impl/transactions/transactions_impl.h"
-#include "impl/transactions/transaction_impl.h"
-#include "impl/response_status.h"
using namespace ignite::common::concurrent;
using namespace ignite::impl::thin;
@@ -38,15 +35,6 @@
// No-op.
}
- template<typename ReqT, typename RspT>
- void TransactionsImpl::SendTxMessage(const ReqT& req, RspT& rsp)
- {
- router.Get()->SyncMessage(req, rsp);
-
- if (rsp.GetStatus() != ResponseStatus::SUCCESS)
- throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str());
- }
-
SharedPointer<TransactionImpl> TransactionsImpl::TxStart(
TransactionConcurrency::Type concurrency,
TransactionIsolation::Type isolation,
@@ -54,48 +42,11 @@
int32_t txSize,
SharedPointer<common::FixedSizeArray<char> > label)
{
- SP_TransactionImpl tx = TransactionImpl::Create(*this, concurrency, isolation, timeout, txSize, label);
+ SP_TransactionImpl tx = TransactionImpl::Create(*this, router, concurrency, isolation, timeout, txSize, label);
return tx;
}
- SP_TransactionImpl TransactionImpl::Create(
- TransactionsImpl& txs,
- TransactionConcurrency::Type concurrency,
- TransactionIsolation::Type isolation,
- int64_t timeout,
- int32_t txSize,
- SharedPointer<common::FixedSizeArray<char> > label)
- {
- SP_TransactionImpl tx = txs.GetCurrent();
-
- TransactionImpl* ptr = tx.Get();
-
- if (ptr && !ptr->IsClosed())
- {
- throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_STARTED);
- }
-
- TxStartRequest req(concurrency, isolation, timeout, label);
-
- Int32Response rsp;
-
- txs.SendTxMessage(req, rsp);
-
- int32_t curTxId = rsp.GetValue();
-
- tx = SP_TransactionImpl(new TransactionImpl(txs, curTxId, concurrency, isolation, timeout, txSize));
-
- txs.SetCurrent(tx);
-
- return tx;
- }
-
- bool TransactionImpl::IsClosed() const
- {
- return closed;
- }
-
SP_TransactionImpl TransactionsImpl::GetCurrent()
{
SP_TransactionImpl tx = threadTx.Get();
@@ -121,90 +72,6 @@
{
threadTx.Remove();
}
-
- int32_t TransactionsImpl::TxCommit(int32_t txId)
- {
- TxEndRequest req(txId, true);
-
- Response rsp;
-
- SendTxMessage(req, rsp);
-
- return rsp.GetStatus();
- }
-
- int32_t TransactionsImpl::TxRollback(int32_t txId)
- {
- TxEndRequest req(txId, false);
-
- Response rsp;
-
- SendTxMessage(req, rsp);
-
- return rsp.GetStatus();
- }
-
- int32_t TransactionsImpl::TxClose(int32_t txId)
- {
- return TxRollback(txId);
- }
-
- void TransactionImpl::Commit()
- {
- ThreadCheck();
-
- txs.TxCommit(txId);
-
- ThreadEnd();
- }
-
- void TransactionImpl::Rollback()
- {
- ThreadCheck();
-
- txs.TxRollback(txId);
-
- ThreadEnd();
- }
-
- void TransactionImpl::Close()
- {
- ThreadCheck();
-
- if (IsClosed())
- {
- return;
- }
-
- txs.TxClose(txId);
-
- ThreadEnd();
- }
-
- void TransactionImpl::SetClosed()
- {
- closed = true;
- }
-
- void TransactionImpl::ThreadEnd()
- {
- this->SetClosed();
-
- txs.ResetCurrent();
- }
-
- void TransactionImpl::ThreadCheck()
- {
- SP_TransactionImpl tx = txs.GetCurrent();
-
- TransactionImpl* ptr = tx.Get();
-
- if (!ptr)
- throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_CLOSED);
-
- if (ptr->TxId() != this->TxId())
- throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_DIFFERENT_THREAD);
- }
}
}
}
diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h
index 278545d..a0879fb 100644
--- a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h
+++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h
@@ -18,9 +18,10 @@
#ifndef _IGNITE_IMPL_THIN_TRANSACTIONS_IMPL
#define _IGNITE_IMPL_THIN_TRANSACTIONS_IMPL
-#include "impl/data_router.h"
#include <ignite/common/fixed_size_array.h>
-#include "ignite/thin/transactions/transaction_consts.h"
+#include <ignite/thin/transactions/transaction_consts.h>
+
+#include "impl/data_router.h"
#include "impl/transactions/transaction_impl.h"
namespace ignite
@@ -31,11 +32,6 @@
{
namespace transactions
{
- class TransactionsImpl;
-
- typedef ignite::common::concurrent::SharedPointer<TransactionImpl> SP_TransactionImpl;
- typedef ignite::common::concurrent::SharedPointer<TransactionsImpl> SP_TransactionsImpl;
-
/**
* Thin client transaction.
*/
@@ -72,32 +68,6 @@
ignite::common::concurrent::SharedPointer<common::FixedSizeArray<char> > label);
/**
- * Commit Transaction.
- *
- * @param id Transaction ID.
- * @return Resulting state.
- */
- int32_t TxCommit(int32_t id);
-
- /**
- * Rollback Transaction.
- *
- * @param id Transaction ID.
- * @return Resulting state.
- */
- int32_t TxRollback(int32_t id);
-
-
- /**
- * Close the transaction.
- *
- * This method should only be used on the valid instance.
- *
- * @param id Transaction ID.
- */
- int32_t TxClose(int32_t id);
-
- /**
* Get active transaction for the current thread.
*
* @return Active transaction implementation for current thread
@@ -118,16 +88,6 @@
*/
void ResetCurrent();
- /**
- * Synchronously send message and receive response.
- *
- * @param req Request message.
- * @param rsp Response message.
- * @throw IgniteError on error.
- */
- template<typename ReqT, typename RspT>
- void SendTxMessage(const ReqT& req, RspT& rsp);
-
private:
/** Data router. */
SP_DataRouter router;
@@ -137,6 +97,8 @@
IGNITE_NO_COPY_ASSIGNMENT(TransactionsImpl);
};
+
+ typedef ignite::common::concurrent::SharedPointer<TransactionsImpl> SP_TransactionsImpl;
}
}
}