blob: 04422ad6cb29a426fbb3cd68eb8d6d60d0005c6e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <boost/test/unit_test.hpp>
#include <ignite/ignition.h>
#include "ignite/thin/ignite_client_configuration.h"
#include "ignite/thin/ignite_client.h"
#include "ignite/thin/cache/cache_peek_mode.h"
#include "ignite/thin/transactions/transaction_consts.h"
#include <test_utils.h>
#include <ignite/ignite_error.h>
using namespace ignite::thin;
using namespace boost::unit_test;
using namespace ignite::thin::transactions;
using namespace ignite::common::concurrent;
class IgniteTxTestSuiteFixture
{
public:
IgniteTxTestSuiteFixture()
{
node1 = ignite_test::StartCrossPlatformServerNode("cache.xml", "node1");
node2 = ignite_test::StartCrossPlatformServerNode("cache.xml", "node2");
}
~IgniteTxTestSuiteFixture()
{
ignite::Ignition::StopAll(false);
}
/**
* Start client.
*/
static IgniteClient StartClient()
{
IgniteClientConfiguration cfg;
cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111");
return IgniteClient::Start(cfg);
}
private:
/** Server node #1. */
ignite::Ignite node1;
/** Server node #2. */
ignite::Ignite node2;
};
BOOST_FIXTURE_TEST_SUITE(IgniteTxTestSuite, IgniteTxTestSuiteFixture)
bool correctCloseMessage(const ignite::IgniteError& ex)
{
BOOST_CHECK_EQUAL(ex.what(), std::string(TX_ALREADY_CLOSED));
return true;
}
bool separateThreadMessage(const ignite::IgniteError& ex)
{
BOOST_CHECK_EQUAL(ex.what(), std::string(TX_DIFFERENT_THREAD));
return true;
}
bool checkTxTimeoutMessage(const ignite::IgniteError& ex)
{
return std::string(ex.what()).find("Cache transaction timed out") != std::string::npos;
}
BOOST_AUTO_TEST_CASE(TestCacheOpsWithTx)
{
IgniteClient client = StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
cache.Put(1, 1);
transactions::ClientTransactions transactions = client.ClientTransactions();
transactions::ClientTransaction tx = transactions.TxStart();
cache.Put(1, 10);
BOOST_CHECK_EQUAL(10, cache.Get(1));
tx.Rollback();
BOOST_CHECK_EQUAL(1, cache.Get(1));
//---
tx = transactions.TxStart();
cache.Put(1, 10);
tx.Commit();
BOOST_CHECK_EQUAL(10, cache.Get(1));
cache.Put(1, 1);
//---
tx = transactions.TxStart();
cache.Put(1, 10);
BOOST_CHECK_EQUAL(10, cache.Get(1));
tx.Close();
BOOST_CHECK_EQUAL(1, cache.Get(1));
//---
tx = transactions.TxStart(TransactionConcurrency::OPTIMISTIC, TransactionIsolation::SERIALIZABLE);
cache.Put(1, 10);
BOOST_CHECK_EQUAL(10, cache.Get(1));
tx.Close();
BOOST_CHECK_EQUAL(1, cache.Get(1));
//---
tx = transactions.TxStart(TransactionConcurrency::OPTIMISTIC, TransactionIsolation::SERIALIZABLE, 1000, 100);
cache.Put(1, 10);
BOOST_CHECK_EQUAL(10, cache.Get(1));
tx.Close();
BOOST_CHECK_EQUAL(1, cache.Get(1));
//---
tx = transactions.TxStart();
cache.Replace(1, 10);
BOOST_CHECK_EQUAL(10, cache.Get(1));
tx.Rollback();
BOOST_CHECK_EQUAL(1, cache.Get(1));
//---
tx = transactions.TxStart();
cache.Replace(1, 1, 10);
BOOST_CHECK_EQUAL(10, cache.Get(1));
tx.Rollback();
BOOST_CHECK_EQUAL(1, cache.Get(1));
//---
tx = transactions.TxStart();
cache.Put(2, 20);
BOOST_CHECK_EQUAL(cache.ContainsKey(2), true);
tx.Rollback();
BOOST_CHECK_EQUAL(cache.ContainsKey(2), false);
//---
tx = transactions.TxStart();
cache.Put(2, 20);
tx.Rollback();
BOOST_CHECK_EQUAL(cache.GetSize(cache::CachePeekMode::PRIMARY), 1);
//---
tx = transactions.TxStart();
int res1 = cache.GetAndPutIfAbsent(1, 10);
int res2 = cache.GetAndPutIfAbsent(2, 20);
BOOST_CHECK_EQUAL(1, res1);
BOOST_CHECK_EQUAL(cache.Get(2), 20);
BOOST_CHECK_EQUAL(0, res2);
tx.Rollback();
BOOST_CHECK_EQUAL(cache.Get(2), 0);
//---
tx = transactions.TxStart();
cache.Remove(1);
tx.Rollback();
BOOST_CHECK_EQUAL(cache.ContainsKey(1), true);
// Test transaction with a timeout.
const uint32_t TX_TIMEOUT = 200;
tx = transactions.TxStart(TransactionConcurrency::OPTIMISTIC, TransactionIsolation::SERIALIZABLE, TX_TIMEOUT);
cache.Put(1, 10);
boost::this_thread::sleep_for(boost::chrono::milliseconds(2 * TX_TIMEOUT));
BOOST_CHECK_EXCEPTION(cache.Put(1, 20), ignite::IgniteError, checkTxTimeoutMessage);
BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, checkTxTimeoutMessage);
tx.Close();
BOOST_CHECK_EQUAL(1, cache.Get(1));
}
void startAnotherClientAndTx(SharedPointer<SingleLatch>& l)
{
IgniteClient client = IgniteTxTestSuiteFixture::StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
transactions::ClientTransactions transactions = client.ClientTransactions();
transactions::ClientTransaction tx = transactions.TxStart();
l.Get()->CountDown();
cache.Put(2, 20);
tx.Commit();
}
BOOST_AUTO_TEST_CASE(TestTxOps)
{
IgniteClient client = StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
cache.Put(1, 1);
transactions::ClientTransactions transactions = client.ClientTransactions();
transactions::ClientTransaction tx = transactions.TxStart();
BOOST_CHECK_THROW( transactions.TxStart(), ignite::IgniteError );
tx.Close();
//Test end of already completed transaction.
tx = transactions.TxStart();
tx.Close();
BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, correctCloseMessage);
BOOST_CHECK_EXCEPTION(tx.Rollback(), ignite::IgniteError, correctCloseMessage);
// Test end of outdated transaction.
transactions::ClientTransaction tx1 = transactions.TxStart();
BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, separateThreadMessage);
tx1.Close();
// Test end of outdated transaction.
tx = transactions.TxStart();
tx.Commit();
BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, correctCloseMessage);
tx.Close();
// Check multi threads.
SharedPointer<SingleLatch> latch = SharedPointer<SingleLatch>(new SingleLatch());
tx = transactions.TxStart();
cache.Put(1, 10);
boost::thread t2(startAnotherClientAndTx, latch);
latch.Get()->Await();
tx.Rollback();
t2.join();
BOOST_CHECK_EQUAL(1, cache.Get(1));
BOOST_CHECK_EQUAL(20, cache.Get(2));
}
const std::string label = std::string("label_2_check");
std::string label1 = std::string("label_2_check1");
bool checkTxLabelMessage(const ignite::IgniteError& ex)
{
return std::string(ex.what()).find(label) != std::string::npos;
}
bool checkTxLabel1Message(const ignite::IgniteError& ex)
{
return std::string(ex.what()).find("label_2_check1") != std::string::npos;
}
BOOST_AUTO_TEST_CASE(TestTxWithLabel)
{
IgniteClient client = StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
const uint32_t TX_TIMEOUT = 200;
transactions::ClientTransactions transactions = client.ClientTransactions();
transactions::ClientTransaction tx = transactions.withLabel(label).TxStart(TransactionConcurrency::OPTIMISTIC, TransactionIsolation::SERIALIZABLE, TX_TIMEOUT);
cache.Put(1, 10);
boost::this_thread::sleep_for(boost::chrono::milliseconds(2 * TX_TIMEOUT));
BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, checkTxLabelMessage);
tx.Close();
// New label
tx = transactions.TxStart(TransactionConcurrency::OPTIMISTIC, TransactionIsolation::SERIALIZABLE, TX_TIMEOUT);
cache.Put(1, 10);
boost::this_thread::sleep_for(boost::chrono::milliseconds(2 * TX_TIMEOUT));
BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, !checkTxLabelMessage);
tx.Close();
// Label is gone
tx = transactions.withLabel(label1).TxStart(TransactionConcurrency::OPTIMISTIC, TransactionIsolation::SERIALIZABLE, TX_TIMEOUT);
label1 = "NULL";
cache.Put(1, 10);
boost::this_thread::sleep_for(boost::chrono::milliseconds(2 * TX_TIMEOUT));
BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, checkTxLabel1Message);
tx.Close();
}
BOOST_AUTO_TEST_CASE(ManyTransactions)
{
IgniteClient client = StartClient();
cache::CacheClient<int, int> cache =
client.GetCache<int, int>("partitioned");
transactions::ClientTransactions transactions = client.ClientTransactions();
const int32_t key = 42;
for (int32_t val = 0; val < 100; ++val) {
transactions::ClientTransaction tx = transactions.TxStart();
cache.Put(key, val);
tx.Commit();
BOOST_CHECK_EQUAL(val, cache.Get(key));
}
const int32_t expected = -42;
cache.Put(key, expected);
BOOST_CHECK_EQUAL(expected, cache.Get(key));
for (int32_t val = 0; val < 100; ++val) {
transactions::ClientTransaction tx = transactions.TxStart();
cache.Put(key, val);
tx.Rollback();
BOOST_CHECK_EQUAL(expected, cache.Get(key));
}
}
BOOST_AUTO_TEST_SUITE_END()