blob: 7e7fa2048190fddb5890f6e54b2b1d5ae37e8a3d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <boost/test/unit_test.hpp>
#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <ignite/ignition.h>
#include <ignite/test_utils.h>
using namespace ignite;
using namespace ignite::cache;
using namespace ignite::cluster;
using namespace ignite::compute;
using namespace ignite::common::concurrent;
using namespace ignite::impl;
using namespace ignite_test;
using namespace boost::unit_test;
enum { RETRIES_FOR_STABLE_TOPOLOGY = 5};
/*
* Test setup fixture for cache affinity.
*/
struct ComputeTestSuiteFixtureAffinity
{
static const char* cacheName;
Ignite node0;
Ignite node1;
Ignite node2;
Ignite MakeNode(const char* name)
{
#ifdef IGNITE_TESTS_32
const char* config = "cache-test-32.xml";
#else
const char* config = "affinity-test.xml";
#endif
return StartNode(config, name);
}
/*
* Constructor.
*/
ComputeTestSuiteFixtureAffinity()
{
node0 = MakeNode("ComputeAffinityNode0");
BOOST_REQUIRE(WaitForRebalance0());
node1 = MakeNode("ComputeAffinityNode1");
BOOST_REQUIRE(WaitForRebalance1());
node2 = MakeNode("ComputeAffinityNode2");
BOOST_REQUIRE(WaitForRebalance2());
}
/*
* Destructor.
*/
~ComputeTestSuiteFixtureAffinity()
{
Ignition::StopAll(false);
}
/**
* Check whether rebalance is complete for the cluster.
* @return true if complete.
*/
bool IsRebalanceComplete0()
{
return
node0.GetAffinity<int32_t>(cacheName).MapKeyToNode(0).IsLocal() &&
node0.GetAffinity<int32_t>(cacheName).MapKeyToNode(1).IsLocal() &&
node0.GetAffinity<int32_t>(cacheName).MapKeyToNode(6).IsLocal();
}
/**
* Check whether rebalance is complete for the cluster.
* @return true if complete.
*/
bool IsRebalanceComplete1()
{
return
node0.GetAffinity<int32_t>(cacheName).MapKeyToNode(0).IsLocal() &&
node1.GetAffinity<int32_t>(cacheName).MapKeyToNode(1).IsLocal() &&
node1.GetAffinity<int32_t>(cacheName).MapKeyToNode(6).IsLocal();
}
/**
* Check whether rebalance is complete for the cluster.
* @return true if complete.
*/
bool IsRebalanceComplete2()
{
return
node0.GetAffinity<int32_t>(cacheName).MapKeyToNode(0).IsLocal() &&
node1.GetAffinity<int32_t>(cacheName).MapKeyToNode(1).IsLocal() &&
node2.GetAffinity<int32_t>(cacheName).MapKeyToNode(6).IsLocal();
}
/**
* Wait for rebalance.
* @param timeout Timeout to wait.
* @return True if condition was met, false if timeout has been reached.
*/
bool WaitForRebalance0(int32_t timeout = 5000)
{
return WaitForCondition(boost::bind(&ComputeTestSuiteFixtureAffinity::IsRebalanceComplete0, this), timeout);
}
/**
* Wait for rebalance.
* @param timeout Timeout to wait.
* @return True if condition was met, false if timeout has been reached.
*/
bool WaitForRebalance1(int32_t timeout = 5000)
{
return WaitForCondition(boost::bind(&ComputeTestSuiteFixtureAffinity::IsRebalanceComplete1, this), timeout);
}
/**
* Wait for rebalance.
* @param timeout Timeout to wait.
* @return True if condition was met, false if timeout has been reached.
*/
bool WaitForRebalance2(int32_t timeout = 5000)
{
return WaitForCondition(boost::bind(&ComputeTestSuiteFixtureAffinity::IsRebalanceComplete2, this), timeout);
}
};
const char* ComputeTestSuiteFixtureAffinity::cacheName = "test_backups_0";
/*
* Test setup fixture.
*/
struct ComputeTestSuiteFixture
{
Ignite node;
Ignite MakeNode(const char* name)
{
#ifdef IGNITE_TESTS_32
const char* config = "cache-test-32.xml";
#else
const char* config = "cache-test.xml";
#endif
return StartNode(config, name);
}
/*
* Constructor.
*/
ComputeTestSuiteFixture() :
node(MakeNode("ComputeNode1"))
{
// No-op.
}
/*
* Destructor.
*/
~ComputeTestSuiteFixture()
{
Ignition::StopAll(true);
}
};
/*
* Test setup fixture for cluster group.
*/
struct ComputeTestSuiteFixtureClusterGroup
{
enum NodeType {
SERVER_NODE_ATTRIBUTE_VALUE0,
SERVER_NODE_ATTRIBUTE_VALUE1,
CLIENT_NODE,
};
Ignite server0;
Ignite server1;
Ignite server2;
Ignite client;
Ignite MakeNode(const char* name, NodeType type)
{
std::string config;
switch (type) {
case SERVER_NODE_ATTRIBUTE_VALUE0:
config = "compute-server0.xml";
break;
case SERVER_NODE_ATTRIBUTE_VALUE1:
config = "compute-server1.xml";
break;
case CLIENT_NODE:
config = "compute-client.xml";
break;
}
#ifdef IGNITE_TESTS_32
config.replace(config.begin() + config.find(".xml"), config.end(), "-32.xml");
#endif
return StartNode(config.c_str(), name);
}
/*
* Constructor.
*/
ComputeTestSuiteFixtureClusterGroup() :
server0(MakeNode("ServerNode0", SERVER_NODE_ATTRIBUTE_VALUE0)),
server1(MakeNode("ServerNode1", SERVER_NODE_ATTRIBUTE_VALUE1)),
server2(MakeNode("ServerNode2", SERVER_NODE_ATTRIBUTE_VALUE1)),
client(MakeNode("ClientNode", CLIENT_NODE))
{
// No-op.
}
/*
* Destructor.
*/
~ComputeTestSuiteFixtureClusterGroup()
{
Ignition::StopAll(true);
}
};
struct Func1 : ComputeFunc<std::string>
{
Func1() :
a(), b(), err()
{
// No-op.
}
Func1(int32_t a, int32_t b) :
a(a), b(b), err()
{
// No-op.
}
Func1(IgniteError err) :
a(), b(), err(err)
{
// No-op.
}
virtual std::string Call()
{
if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
throw err;
std::stringstream tmp;
tmp << a << '.' << b;
return tmp.str();
}
int32_t a;
int32_t b;
IgniteError err;
};
struct Func2 : ComputeFunc<std::string>
{
Func2() :
a(), b(), err()
{
// No-op.
}
Func2(int32_t a, int32_t b) :
a(a), b(b), err()
{
// No-op.
}
Func2(IgniteError err) :
a(), b(), err(err)
{
// No-op.
}
virtual std::string Call()
{
boost::this_thread::sleep_for(boost::chrono::milliseconds(200));
if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
throw err;
std::stringstream tmp;
tmp << a << '.' << b;
return tmp.str();
}
int32_t a;
int32_t b;
IgniteError err;
};
struct Func3 : ComputeFunc<void>
{
Func3() :
a(), b(), err()
{
// No-op.
}
Func3(int32_t a, int32_t b) :
a(a), b(b), err()
{
// No-op.
}
Func3(IgniteError err) :
a(), b(), err(err)
{
// No-op.
}
virtual void Call()
{
boost::this_thread::sleep_for(boost::chrono::milliseconds(200));
if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
throw err;
std::stringstream tmp;
tmp << a << '.' << b;
res = tmp.str();
}
int32_t a;
int32_t b;
IgniteError err;
static std::string res;
};
std::string Func3::res;
void EmptyDeleter(IgniteEnvironment*)
{
// No-op.
}
struct FuncAffinityCall : ComputeFunc<int32_t>
{
FuncAffinityCall() :
cacheName(), cacheKey(), err()
{
// No-op.
}
FuncAffinityCall(std::string cacheName, int32_t cacheKey) :
cacheName(cacheName), cacheKey(cacheKey), err()
{
// No-op.
}
FuncAffinityCall(IgniteError err) :
cacheName(), cacheKey(), err(err)
{
// No-op.
}
virtual int32_t Call()
{
Ignite& node = GetIgnite();
Cache<int32_t, int32_t> cache = node.GetCache<int32_t, int32_t>(cacheName.c_str());
return cache.LocalPeek(cacheKey, CachePeekMode::PRIMARY);
}
std::string cacheName;
int32_t cacheKey;
IgniteError err;
};
struct FuncAffinityRun : ComputeFunc<void>
{
FuncAffinityRun() :
cacheName(), cacheKey(), err()
{
// No-op.
}
FuncAffinityRun(std::string cacheName, int32_t cacheKey, int32_t checkKey) :
cacheName(cacheName), cacheKey(cacheKey), checkKey(checkKey), err()
{
// No-op.
}
FuncAffinityRun(IgniteError err) :
cacheName(), cacheKey(), err(err)
{
// No-op.
}
virtual void Call()
{
Ignite& node = GetIgnite();
Cache<int32_t, int32_t> cache = node.GetCache<int32_t, int32_t>(cacheName.c_str());
int32_t res = cache.LocalPeek(cacheKey, CachePeekMode::PRIMARY);
cache.Put(checkKey, res);
res = cache.Get(checkKey);
}
std::string cacheName;
int32_t cacheKey;
int32_t checkKey;
IgniteError err;
};
namespace ignite
{
namespace binary
{
template<>
struct BinaryType<Func1> : BinaryTypeDefaultAll<Func1>
{
static void GetTypeName(std::string& dst)
{
dst = "Func1";
}
static void Write(BinaryWriter& writer, const Func1& obj)
{
writer.WriteInt32("a", obj.a);
writer.WriteInt32("b", obj.b);
writer.WriteObject<IgniteError>("err", obj.err);
}
static void Read(BinaryReader& reader, Func1& dst)
{
dst.a = reader.ReadInt32("a");
dst.b = reader.ReadInt32("b");
dst.err = reader.ReadObject<IgniteError>("err");
}
};
template<>
struct BinaryType<Func2> : BinaryTypeDefaultAll<Func2>
{
static void GetTypeName(std::string& dst)
{
dst = "Func2";
}
static void Write(BinaryWriter& writer, const Func2& obj)
{
writer.WriteInt32("a", obj.a);
writer.WriteInt32("b", obj.b);
writer.WriteObject<IgniteError>("err", obj.err);
}
static void Read(BinaryReader& reader, Func2& dst)
{
dst.a = reader.ReadInt32("a");
dst.b = reader.ReadInt32("b");
dst.err = reader.ReadObject<IgniteError>("err");
}
};
template<>
struct BinaryType<Func3> : BinaryTypeDefaultAll<Func3>
{
static void GetTypeName(std::string& dst)
{
dst = "Func3";
}
static void Write(BinaryWriter& writer, const Func3& obj)
{
writer.WriteInt32("a", obj.a);
writer.WriteInt32("b", obj.b);
writer.WriteObject<IgniteError>("err", obj.err);
}
static void Read(BinaryReader& reader, Func3& dst)
{
dst.a = reader.ReadInt32("a");
dst.b = reader.ReadInt32("b");
dst.err = reader.ReadObject<IgniteError>("err");
}
};
template<>
struct BinaryType<FuncAffinityCall> : BinaryTypeDefaultAll<FuncAffinityCall>
{
static void GetTypeName(std::string& dst)
{
dst = "FuncAffinityCall";
}
static void Write(BinaryWriter& writer, const FuncAffinityCall& obj)
{
writer.WriteString("cacheName", obj.cacheName);
writer.WriteInt32("cacheKey", obj.cacheKey);
writer.WriteObject<IgniteError>("err", obj.err);
}
static void Read(BinaryReader& reader, FuncAffinityCall& dst)
{
dst.cacheName = reader.ReadString("cacheName");
dst.cacheKey = reader.ReadInt32("cacheKey");
dst.err = reader.ReadObject<IgniteError>("err");
}
};
template<>
struct BinaryType<FuncAffinityRun> : BinaryTypeDefaultAll<FuncAffinityRun>
{
static void GetTypeName(std::string& dst)
{
dst = "FuncAffinityRun";
}
static void Write(BinaryWriter& writer, const FuncAffinityRun& obj)
{
writer.WriteString("cacheName", obj.cacheName);
writer.WriteInt32("cacheKey", obj.cacheKey);
writer.WriteInt32("checkKey", obj.checkKey);
writer.WriteObject<IgniteError>("err", obj.err);
}
static void Read(BinaryReader& reader, FuncAffinityRun& dst)
{
dst.cacheName = reader.ReadString("cacheName");
dst.cacheKey = reader.ReadInt32("cacheKey");
dst.checkKey = reader.ReadInt32("checkKey");
dst.err = reader.ReadObject<IgniteError>("err");
}
};
}
}
IGNITE_EXPORTED_CALL void IgniteModuleInit1(IgniteBindingContext& context)
{
IgniteBinding binding = context.GetBinding();
binding.RegisterComputeFunc<Func1>();
binding.RegisterComputeFunc<Func2>();
binding.RegisterComputeFunc<Func3>();
binding.RegisterComputeFunc<FuncAffinityCall>();
binding.RegisterComputeFunc<FuncAffinityRun>();
}
template<typename TK>
std::vector<int32_t> GetPrimaryKeys(size_t num, ClusterNode& node, CacheAffinity<TK>& affinity)
{
std::vector<int32_t> ret;
if (!num)
return ret;
for (int32_t i = 0; i < INT_MAX; i++)
{
if (affinity.IsPrimary(node, i)) {
ret.push_back(i);
if (ret.size() >= num)
return ret;
}
}
BOOST_CHECK(false);
return ret;
}
BOOST_FIXTURE_TEST_SUITE(ComputeTestSuiteAffinity, ComputeTestSuiteFixtureAffinity)
BOOST_AUTO_TEST_CASE(IgniteAffinityCall)
{
const int32_t key = 100;
const int32_t value = 500;
Cache<int32_t, int32_t> cache = node0.GetCache<int32_t, int32_t>(cacheName);
cache.Put(key, value);
CacheAffinity<int> affinity = node0.GetAffinity<int32_t>(cache.GetName());
Compute compute = node0.GetCompute();
ClusterNode clusterNode0 = affinity.MapKeyToNode(100);
BOOST_TEST_CHECKPOINT("Starting local calls loop");
std::vector<int32_t> aKeys = GetPrimaryKeys(10, clusterNode0, affinity);
for (size_t i = 0; i < aKeys.size(); i++)
{
int32_t res = compute.AffinityCall<int32_t>(cache.GetName(), aKeys[i],
FuncAffinityCall(cache.GetName(), key));
BOOST_CHECK_EQUAL(res, value);
}
ClusterNode clusterNode1 = node0.GetCluster().GetLocalNode();
if (clusterNode0.GetId() == clusterNode1.GetId())
clusterNode1 = node1.GetCluster().GetLocalNode();
BOOST_REQUIRE_NE(clusterNode0.GetId(), clusterNode1.GetId());
BOOST_REQUIRE(!affinity.IsPrimary(clusterNode1, key));
BOOST_TEST_CHECKPOINT("Starting remote calls loop");
aKeys = GetPrimaryKeys(10, clusterNode1, affinity);
for (size_t i = 0; i < aKeys.size(); i++)
{
int32_t res = compute.AffinityCall<int32_t>(cache.GetName(), aKeys[i],
FuncAffinityCall(cache.GetName(), key));
BOOST_CHECK_EQUAL(res, 0);
}
}
BOOST_AUTO_TEST_CASE(IgniteAffinityCallAsync)
{
const int32_t key = 100;
const int32_t value = 500;
Cache<int32_t, int32_t> cache = node0.GetCache<int32_t, int32_t>(cacheName);
cache.Put(key, value);
CacheAffinity<int> affinity = node0.GetAffinity<int32_t>(cache.GetName());
Compute compute = node0.GetCompute();
ClusterNode clusterNode0 = affinity.MapKeyToNode(100);
BOOST_TEST_CHECKPOINT("Starting calls loop");
std::vector<int32_t> aKeys = GetPrimaryKeys(10, clusterNode0, affinity);
for (size_t i = 0; i < aKeys.size(); i++)
{
Future<int32_t> res = compute.AffinityCallAsync<int32_t>(cache.GetName(), aKeys[i],
FuncAffinityCall(cache.GetName(), key));
int32_t resVal = res.GetValue();
BOOST_CHECK_EQUAL(value, resVal);
}
ClusterNode clusterNode1 = node0.GetCluster().GetLocalNode();
if (clusterNode0.GetId() == clusterNode1.GetId())
clusterNode1 = node1.GetCluster().GetLocalNode();
BOOST_REQUIRE_NE(clusterNode0.GetId(), clusterNode1.GetId());
BOOST_REQUIRE(!affinity.IsPrimary(clusterNode1, key));
BOOST_TEST_CHECKPOINT("Starting remote calls loop");
aKeys = GetPrimaryKeys(10, clusterNode1, affinity);
for (size_t i = 0; i < aKeys.size(); i++)
{
Future<int32_t> res = compute.AffinityCallAsync<int32_t>(cache.GetName(), aKeys[i],
FuncAffinityCall(cache.GetName(), key));
int32_t resVal = res.GetValue();
BOOST_CHECK_EQUAL(0, resVal);
}
}
BOOST_AUTO_TEST_CASE(IgniteAffinityRun)
{
const int32_t key = 100;
const int32_t checkKey = -1;
const int32_t value = 500;
Cache<int32_t, int32_t> cache = node0.GetCache<int32_t, int32_t>(cacheName);
cache.Put(key, value);
CacheAffinity<int> affinity = node0.GetAffinity<int32_t>(cache.GetName());
Compute compute = node0.GetCompute();
ClusterNode clusterNode0 = affinity.MapKeyToNode(100);
BOOST_TEST_CHECKPOINT("Starting calls loop");
std::vector<int32_t> aKeys = GetPrimaryKeys(10, clusterNode0, affinity);
for (size_t i = 0; i < aKeys.size(); i++)
{
compute.AffinityRun(cache.GetName(), aKeys[i],
FuncAffinityRun(cache.GetName(), key, checkKey));
int32_t resVal = cache.Get(checkKey);
BOOST_CHECK_EQUAL(500, resVal);
}
ClusterNode clusterNode1 = node0.GetCluster().GetLocalNode();
if (clusterNode0.GetId() == clusterNode1.GetId())
clusterNode1 = node1.GetCluster().GetLocalNode();
BOOST_REQUIRE_NE(clusterNode0.GetId(), clusterNode1.GetId());
BOOST_REQUIRE(!affinity.IsPrimary(clusterNode1, key));
BOOST_TEST_CHECKPOINT("Starting remote calls loop");
aKeys = GetPrimaryKeys(10, clusterNode1, affinity);
for (size_t i = 0; i < aKeys.size(); i++)
{
compute.AffinityRun(cache.GetName(), aKeys[i],
FuncAffinityRun(cache.GetName(), key, checkKey));
int32_t resVal = cache.Get(checkKey);
BOOST_CHECK_EQUAL(0, resVal);
}
}
BOOST_AUTO_TEST_CASE(IgniteAffinityRunAsync)
{
const int32_t key = 100;
const int32_t checkKey = -1;
const int32_t value = 500;
Cache<int32_t, int32_t> cache = node0.GetCache<int32_t, int32_t>(cacheName);
cache.Put(key, value);
CacheAffinity<int> affinity = node0.GetAffinity<int32_t>(cache.GetName());
Compute compute = node0.GetCompute();
ClusterNode clusterNode0 = affinity.MapKeyToNode(100);
BOOST_TEST_CHECKPOINT("Starting calls loop");
std::vector<int32_t> aKeys = GetPrimaryKeys(10, clusterNode0, affinity);
for (size_t i = 0; i < aKeys.size(); i++)
{
Future<void> res = compute.AffinityRunAsync(cache.GetName(), aKeys[i],
FuncAffinityRun(cache.GetName(), key, checkKey));
res.GetValue();
int32_t resVal = cache.Get(checkKey);
BOOST_CHECK_EQUAL(500, resVal);
}
ClusterNode clusterNode1 = node0.GetCluster().GetLocalNode();
if (clusterNode0.GetId() == clusterNode1.GetId())
clusterNode1 = node1.GetCluster().GetLocalNode();
BOOST_REQUIRE_NE(clusterNode0.GetId(), clusterNode1.GetId());
BOOST_REQUIRE(!affinity.IsPrimary(clusterNode1, key));
BOOST_TEST_CHECKPOINT("Starting remote calls loop");
aKeys = GetPrimaryKeys(10, clusterNode1, affinity);
for (size_t i = 0; i < aKeys.size(); i++)
{
Future<void> res = compute.AffinityRunAsync<int32_t>(cache.GetName(), aKeys[i],
FuncAffinityRun(cache.GetName(), key, checkKey));
res.GetValue();
int32_t resVal = cache.Get(checkKey);
BOOST_CHECK_EQUAL(0, resVal);
}
}
BOOST_AUTO_TEST_SUITE_END()
BOOST_FIXTURE_TEST_SUITE(ComputeTestSuite, ComputeTestSuiteFixture)
BOOST_AUTO_TEST_CASE(IgniteCallSyncLocal)
{
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Making Call");
std::string res = compute.Call<std::string>(Func1(8, 5));
BOOST_CHECK_EQUAL(res, "8.5");
}
BOOST_AUTO_TEST_CASE(IgniteCallAsyncLocal)
{
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Making Call");
Future<std::string> res = compute.CallAsync<std::string>(Func2(312, 245));
BOOST_CHECK(!res.IsReady());
BOOST_TEST_CHECKPOINT("Waiting with timeout");
res.WaitFor(100);
BOOST_CHECK(!res.IsReady());
BOOST_CHECK_EQUAL(res.GetValue(), "312.245");
}
BOOST_AUTO_TEST_CASE(IgniteCallSyncLocalError)
{
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Making Call");
BOOST_CHECK_EXCEPTION(compute.Call<std::string>(Func1(MakeTestError())), IgniteError, IsTestError);
}
BOOST_AUTO_TEST_CASE(IgniteCallAsyncLocalError)
{
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Making Call");
Future<std::string> res = compute.CallAsync<std::string>(Func2(MakeTestError()));
BOOST_CHECK(!res.IsReady());
BOOST_TEST_CHECKPOINT("Waiting with timeout");
res.WaitFor(100);
BOOST_CHECK(!res.IsReady());
BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
}
BOOST_AUTO_TEST_CASE(IgniteCallTestRemote)
{
Ignite node2 = MakeNode("ComputeNode2");
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Making Call");
compute.CallAsync<std::string>(Func2(8, 5));
std::string res = compute.Call<std::string>(Func1(42, 24));
BOOST_CHECK_EQUAL(res, "42.24");
}
BOOST_AUTO_TEST_CASE(IgniteCallTestRemoteError)
{
Ignite node2 = MakeNode("ComputeNode2");
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Making Call");
compute.CallAsync<std::string>(Func2(8, 5));
Future<std::string> res = compute.CallAsync<std::string>(Func2(MakeTestError()));
BOOST_CHECK(!res.IsReady());
BOOST_TEST_CHECKPOINT("Waiting with timeout");
res.WaitFor(100);
BOOST_CHECK(!res.IsReady());
BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
}
BOOST_AUTO_TEST_CASE(IgniteRunSyncLocal)
{
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Running");
compute.Run(Func3(8, 5));
BOOST_CHECK_EQUAL(Func3::res, "8.5");
}
BOOST_AUTO_TEST_CASE(IgniteRunAsyncLocal)
{
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Running");
Future<void> res = compute.RunAsync(Func3(312, 245));
BOOST_CHECK(!res.IsReady());
BOOST_TEST_CHECKPOINT("Waiting with timeout");
res.WaitFor(100);
BOOST_CHECK(!res.IsReady());
res.GetValue();
BOOST_CHECK_EQUAL(Func3::res, "312.245");
}
BOOST_AUTO_TEST_CASE(IgniteRunSyncLocalError)
{
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Running");
BOOST_CHECK_EXCEPTION(compute.Run(Func3(MakeTestError())), IgniteError, IsTestError);
}
BOOST_AUTO_TEST_CASE(IgniteRunAsyncLocalError)
{
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Running");
Future<void> res = compute.RunAsync(Func3(MakeTestError()));
BOOST_CHECK(!res.IsReady());
BOOST_TEST_CHECKPOINT("Waiting with timeout");
res.WaitFor(100);
BOOST_CHECK(!res.IsReady());
BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
}
BOOST_AUTO_TEST_CASE(IgniteRunRemote)
{
Ignite node2 = MakeNode("ComputeNode2");
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Running");
compute.CallAsync<std::string>(Func2(8, 5));
compute.Run(Func3(42, 24));
BOOST_CHECK_EQUAL(Func3::res, "42.24");
}
BOOST_AUTO_TEST_CASE(IgniteRunRemoteError)
{
Ignite node2 = MakeNode("ComputeNode2");
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Running");
compute.CallAsync<std::string>(Func2(8, 5));
Future<void> res = compute.RunAsync(Func3(MakeTestError()));
BOOST_CHECK(!res.IsReady());
BOOST_TEST_CHECKPOINT("Waiting with timeout");
res.WaitFor(100);
BOOST_CHECK(!res.IsReady());
BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
}
BOOST_AUTO_TEST_CASE(IgniteBroadcastLocalSync)
{
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Broadcasting");
std::vector<std::string> res = compute.Broadcast<std::string>(Func2(8, 5));
BOOST_CHECK_EQUAL(res.size(), 1);
BOOST_CHECK_EQUAL(res[0], "8.5");
}
BOOST_AUTO_TEST_CASE(IgniteBroadcastLocalAsync)
{
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Broadcasting");
Future< std::vector<std::string> > res = compute.BroadcastAsync<std::string>(Func2(312, 245));
BOOST_CHECK(!res.IsReady());
BOOST_TEST_CHECKPOINT("Waiting with timeout");
res.WaitFor(100);
BOOST_CHECK(!res.IsReady());
std::vector<std::string> value = res.GetValue();
BOOST_CHECK_EQUAL(value.size(), 1);
BOOST_CHECK_EQUAL(value[0], "312.245");
}
BOOST_AUTO_TEST_CASE(IgniteBroadcastSyncLocalError)
{
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Broadcasting");
BOOST_CHECK_EXCEPTION(compute.Broadcast(Func2(MakeTestError())), IgniteError, IsTestError);
}
BOOST_AUTO_TEST_CASE(IgniteBroadcastAsyncLocalError)
{
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Broadcasting");
Future<void> res = compute.BroadcastAsync(Func2(MakeTestError()));
BOOST_CHECK(!res.IsReady());
BOOST_TEST_CHECKPOINT("Waiting with timeout");
res.WaitFor(100);
BOOST_CHECK(!res.IsReady());
BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
}
BOOST_AUTO_TEST_CASE(IgniteBroadcastRemote)
{
Ignite node2 = MakeNode("ComputeNode2");
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Broadcasting");
std::vector<std::string> res = compute.Broadcast<std::string>(Func2(8, 5));
BOOST_CHECK_EQUAL(res.size(), 2);
BOOST_CHECK_EQUAL(res[0], "8.5");
BOOST_CHECK_EQUAL(res[1], "8.5");
}
BOOST_AUTO_TEST_CASE(IgniteBroadcastRemoteError)
{
Ignite node2 = MakeNode("ComputeNode2");
Compute compute = node.GetCompute();
BOOST_TEST_CHECKPOINT("Broadcasting");
Future< std::vector<std::string> > res = compute.BroadcastAsync<std::string>(Func2(MakeTestError()));
BOOST_CHECK(!res.IsReady());
BOOST_TEST_CHECKPOINT("Waiting with timeout");
res.WaitFor(100);
BOOST_CHECK(!res.IsReady());
BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
}
BOOST_AUTO_TEST_SUITE_END()
BOOST_FIXTURE_TEST_SUITE(ComputeTestSuiteClusterGroup, ComputeTestSuiteFixtureClusterGroup)
BOOST_AUTO_TEST_CASE(IgniteGetClusterGroupForServers)
{
ClusterGroup localGroup = client.GetCluster().AsClusterGroup();
ClusterGroup group = localGroup.ForServers();
Compute compute = client.GetCompute(group);
BOOST_TEST_CHECKPOINT("Broadcasting");
std::vector<std::string> res = compute.Broadcast<std::string>(Func2(8, 5));
BOOST_CHECK_EQUAL(res.size(), 3);
BOOST_CHECK_EQUAL(res[0], "8.5");
BOOST_CHECK_EQUAL(res[1], "8.5");
BOOST_CHECK_EQUAL(res[2], "8.5");
}
BOOST_AUTO_TEST_CASE(IgniteGetClusterGroupForAttribute)
{
ClusterGroup localGroup = client.GetCluster().AsClusterGroup();
ClusterGroup group1 = localGroup.ForAttribute("TestAttribute", "Value0");
ClusterGroup group2 = localGroup.ForAttribute("TestAttribute", "Value1");
Compute compute1 = client.GetCompute(group1);
Compute compute2 = client.GetCompute(group2);
BOOST_TEST_CHECKPOINT("Broadcasting1");
std::vector<std::string> res1 = compute1.Broadcast<std::string>(Func2(8, 5));
BOOST_CHECK_EQUAL(res1.size(), 1);
BOOST_CHECK_EQUAL(res1[0], "8.5");
BOOST_TEST_CHECKPOINT("Broadcasting2");
std::vector<std::string> res2 = compute2.Broadcast<std::string>(Func2(8, 5));
BOOST_CHECK_EQUAL(res2.size(), 2);
BOOST_CHECK_EQUAL(res2[0], "8.5");
BOOST_CHECK_EQUAL(res2[1], "8.5");
}
BOOST_AUTO_TEST_SUITE_END()