blob: d6adb4929ad02cf5758cf5e5c927a471c071c96f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// ReSharper disable RedundantCast
namespace Apache.Ignite.Core.Tests.Client.Cache
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;
using Apache.Ignite.Core.Client.DataStructures;
using Apache.Ignite.Core.Common;
using NUnit.Framework;
/// <summary>
/// Tests partition awareness functionality.
/// </summary>
public class PartitionAwarenessTest : ClientTestBase
{
/** */
private const int ServerCount = 3;
/** */
private ICacheClient<int, int> _cache;
/// <summary>
/// Initializes a new instance of the <see cref="PartitionAwarenessTest"/> class.
/// </summary>
public PartitionAwarenessTest()
: base(ServerCount, enableServerListLogging: true)
{
// No-op.
}
/// <summary>
/// Fixture set up.
/// </summary>
public override void FixtureSetUp()
{
base.FixtureSetUp();
_cache = Client.CreateCache<int, int>("c");
// Warm up client partition data.
InitTestData();
_cache.Get(1);
_cache.Get(2);
}
public override void TestSetUp()
{
base.TestSetUp();
InitTestData();
ClearLoggers();
}
[Test]
[TestCase(1, 1)]
[TestCase(2, 0)]
[TestCase(3, 0)]
[TestCase(4, 1)]
[TestCase(5, 1)]
[TestCase(6, 2)]
public void CacheGet_PrimitiveKeyType_RequestIsRoutedToPrimaryNode(int key, int gridIdx)
{
var res = _cache.Get(key);
Assert.AreEqual(key, res);
Assert.AreEqual(gridIdx, GetClientRequestGridIndex());
}
[Test]
[TestCase(1, 1)]
[TestCase(2, 0)]
[TestCase(3, 0)]
[TestCase(4, 1)]
[TestCase(5, 1)]
[TestCase(6, 2)]
public void CacheGetAsync_PrimitiveKeyType_RequestIsRoutedToPrimaryNode(int key, int gridIdx)
{
var res = _cache.GetAsync(key).Result;
Assert.AreEqual(key, res);
Assert.AreEqual(gridIdx, GetClientRequestGridIndex());
}
[Test]
[TestCase(1, 0)]
[TestCase(2, 0)]
[TestCase(3, 0)]
[TestCase(4, 2)]
[TestCase(5, 2)]
[TestCase(6, 1)]
public void CacheGet_UserDefinedKeyType_RequestIsRoutedToPrimaryNode(int key, int gridIdx)
{
var cache = Client.GetOrCreateCache<TestKey, int>("c_custom_key");
cache.PutAll(Enumerable.Range(1, 100).ToDictionary(x => new TestKey(x, x.ToString()), x => x));
cache.Get(new TestKey(1, "1")); // Warm up;
ClearLoggers();
var testKey = new TestKey(key, key.ToString());
var res = cache.Get(testKey);
Assert.AreEqual(key, res);
Assert.AreEqual(gridIdx, GetClientRequestGridIndex());
Assert.AreEqual(gridIdx, GetPrimaryNodeIdx(testKey));
}
[Test]
public void CachePut_UserDefinedTypeWithAffinityKey_ThrowsIgniteException()
{
// Note: annotation-based configuration is not supported on Java side.
// Use manual configuration instead.
var cacheClientConfiguration = new CacheClientConfiguration("c_custom_key_aff")
{
KeyConfiguration = new List<CacheKeyConfiguration>
{
new CacheKeyConfiguration(typeof(TestKeyWithAffinity))
{
AffinityKeyFieldName = "_i"
}
}
};
var cache = Client.GetOrCreateCache<TestKeyWithAffinity, int>(cacheClientConfiguration);
var ex = Assert.Throws<IgniteException>(() => cache.Put(new TestKeyWithAffinity(1, "1"), 1));
var expected = string.Format("Affinity keys are not supported. Object '{0}' has an affinity key.",
typeof(TestKeyWithAffinity));
Assert.AreEqual(expected, ex.Message);
}
#if NETCOREAPP // TODO: IGNITE-15710
[Test]
public void CacheGet_NewNodeEnteredTopology_RequestIsRoutedToNewNode()
{
// Warm-up.
Assert.AreEqual(1, _cache.Get(1));
// Before topology change.
Assert.AreEqual(12, _cache.Get(12));
Assert.AreEqual(1, GetClientRequestGridIndex());
Assert.AreEqual(14, _cache.Get(14));
Assert.AreEqual(2, GetClientRequestGridIndex());
// After topology change.
var cfg = GetIgniteConfiguration();
cfg.AutoGenerateIgniteInstanceName = true;
using (Ignition.Start(cfg))
{
TestUtils.WaitForTrueCondition(() =>
{
// Keys 12 and 14 belong to a new node now (-1).
Assert.AreEqual(12, _cache.Get(12));
if (GetClientRequestGridIndex() != -1)
{
return false;
}
Assert.AreEqual(14, _cache.Get(14));
Assert.AreEqual(-1, GetClientRequestGridIndex());
return true;
}, 6000);
}
}
#endif
[Test]
[TestCase(1, 1)]
[TestCase(2, 0)]
[TestCase(3, 0)]
[TestCase(4, 1)]
[TestCase(5, 1)]
[TestCase(6, 2)]
public void AllKeyBasedOperations_PrimitiveKeyType_RequestIsRoutedToPrimaryNode(int key, int gridIdx)
{
int unused;
TestOperation(() => _cache.Get(key), gridIdx);
TestAsyncOperation(() => _cache.GetAsync(key), gridIdx);
TestOperation(() => _cache.TryGet(key, out unused), gridIdx);
TestAsyncOperation(() => _cache.TryGetAsync(key), gridIdx);
TestOperation(() => _cache.Put(key, key), gridIdx, "Put");
TestAsyncOperation(() => _cache.PutAsync(key, key), gridIdx, "Put");
TestOperation(() => _cache.PutIfAbsent(key, key), gridIdx, "PutIfAbsent");
TestAsyncOperation(() => _cache.PutIfAbsentAsync(key, key), gridIdx, "PutIfAbsent");
TestOperation(() => _cache.GetAndPutIfAbsent(key, key), gridIdx, "GetAndPutIfAbsent");
TestAsyncOperation(() => _cache.GetAndPutIfAbsentAsync(key, key), gridIdx, "GetAndPutIfAbsent");
TestOperation(() => _cache.Clear(key), gridIdx, "ClearKey");
TestAsyncOperation(() => _cache.ClearAsync(key), gridIdx, "ClearKey");
TestOperation(() => _cache.ContainsKey(key), gridIdx, "ContainsKey");
TestAsyncOperation(() => _cache.ContainsKeyAsync(key), gridIdx, "ContainsKey");
TestOperation(() => _cache.GetAndPut(key, key), gridIdx, "GetAndPut");
TestAsyncOperation(() => _cache.GetAndPutAsync(key, key), gridIdx, "GetAndPut");
TestOperation(() => _cache.GetAndReplace(key, key), gridIdx, "GetAndReplace");
TestAsyncOperation(() => _cache.GetAndReplaceAsync(key, key), gridIdx, "GetAndReplace");
TestOperation(() => _cache.GetAndRemove(key), gridIdx, "GetAndRemove");
TestAsyncOperation(() => _cache.GetAndRemoveAsync(key), gridIdx, "GetAndRemove");
TestOperation(() => _cache.Replace(key, key), gridIdx, "Replace");
TestAsyncOperation(() => _cache.ReplaceAsync(key, key), gridIdx, "Replace");
TestOperation(() => _cache.Replace(key, key, key + 1), gridIdx, "ReplaceIfEquals");
TestAsyncOperation(() => _cache.ReplaceAsync(key, key, key + 1), gridIdx, "ReplaceIfEquals");
TestOperation(() => _cache.Remove(key), gridIdx, "RemoveKey");
TestAsyncOperation(() => _cache.RemoveAsync(key), gridIdx, "RemoveKey");
TestOperation(() => _cache.Remove(key, key), gridIdx, "RemoveIfEquals");
TestAsyncOperation(() => _cache.RemoveAsync(key, key), gridIdx, "RemoveIfEquals");
}
[Test]
public void CacheGet_RepeatedCall_DoesNotRequestAffinityMapping()
{
// Test that affinity mapping is not requested when known.
// Start new cache to enforce partition mapping request.
Client.CreateCache<int, int>("repeat-call-test");
ClearLoggers();
_cache.Get(1);
_cache.Get(1);
_cache.Get(1);
var requests = GetAllServerRequestNames(RequestNamePrefixCache);
var expectedRequests = new[]
{
"Partitions",
"Get",
"Get",
"Get"
};
CollectionAssert.AreEquivalent(expectedRequests, requests);
}
[Test]
public void ReplicatedCacheGet_RepeatedCall_DoesNotRequestAffinityMapping()
{
// Test cache for which partition awareness is not applicable.
var cfg = new CacheClientConfiguration("replicated_cache") {CacheMode = CacheMode.Replicated};
var cache = Client.CreateCache<int, int>(cfg);
// Init the replicated cache and start the new one to enforce partition mapping request.
cache.PutAll(Enumerable.Range(1, 3).ToDictionary(x => x, x => x));
Client.CreateCache<int, int>("repeat-call-test-replicated");
ClearLoggers();
cache.Get(1);
cache.Get(2);
cache.Get(3);
var reqs = GetLoggers()
.Select(l => GetServerRequestNames(l, RequestNamePrefixCache).ToArray())
.Where(x => x.Length > 0)
.ToArray();
// All requests should go to a single (default) node, because partition awareness is not applicable.
Assert.AreEqual(1, reqs.Length);
// There should be only one partitions request.
var expectedRequests = new[]
{
"Partitions",
"Get",
"Get",
"Get"
};
Assert.AreEqual(expectedRequests, reqs[0]);
}
[Test]
public void CacheGet_PartitionAwarenessDisabled_RequestIsRoutedToDefaultNode()
{
var cfg = GetClientConfiguration();
cfg.EnablePartitionAwareness = false;
using (var client = Ignition.StartClient(cfg))
{
var cache = client.GetCache<int, int>(_cache.Name);
var requestTargets = Enumerable
.Range(1, 10)
.Select(x =>
{
cache.Get(x);
return GetClientRequestGridIndex();
})
.Distinct()
.ToArray();
// Partition awareness disabled - all requests go to same socket, picked with round-robin on connect.
Assert.AreEqual(1, requestTargets.Length);
}
}
// ReSharper disable RedundantExplicitArrayCreation
[Test]
[TestCase(1, 1)]
[TestCase(2, 0)]
[TestCase((uint) 1, 1)]
[TestCase(uint.MaxValue, 0)]
[TestCase((byte) 1, 1)]
[TestCase((byte) 2, 0)]
[TestCase((byte) 131, 1)]
[TestCase((sbyte) 1, 1)]
[TestCase((sbyte) -2, 1)]
[TestCase((short) 1, 1)]
[TestCase((short) 2, 0)]
[TestCase((ushort) 1, 1)]
[TestCase(ushort.MaxValue, 0)]
[TestCase((long) 1, 1)]
[TestCase((long) 2, 0)]
[TestCase((ulong) 1, 1)]
[TestCase(ulong.MaxValue, 0)]
[TestCase((float) 1.3, 0)]
[TestCase((float) 1.4, 2)]
[TestCase((double) 51.3, 1)]
[TestCase((double) 415.5, 0)]
[TestCase((double) 325.5, 0)]
[TestCase((double) 255.5, 1)]
[TestCase('1', 2)]
[TestCase('2', 1)]
[TestCase("1", 2)]
[TestCase("2", 1)]
[TestCase("Hello World", 0)]
[TestCase("Тест1", 1)]
[TestCase("🙂🔥😎", 2)]
[TestCase(true, 1)]
[TestCase(false, 1)]
[TestCase(new[]{true, false}, 1)]
[TestCase(new byte[]{1, 2}, 2)]
[TestCase(new sbyte[]{1, -2}, 0)]
[TestCase(new short[]{1, 3}, 2)]
[TestCase(new ushort[]{1, 4}, 2)]
[TestCase(new int[]{1, 5}, 2)]
[TestCase(new uint[]{1, 6}, 1)]
[TestCase(new long[]{1, 7}, 0)]
[TestCase(new ulong[]{1, 8}, 0)]
[TestCase(new float[]{1.1f, 9.9f}, 1)]
[TestCase(new double[]{1.2f, 19.19f}, 1)]
[TestCase(new char[]{'x', 'y'}, 1)]
[TestCase(new string[]{"Hello", "World"}, 2)]
// ReSharper restore RedundantExplicitArrayCreation
public void CachePut_AllPrimitiveTypes_RequestIsRoutedToPrimaryNode(object key, int gridIdx)
{
var cache = Client.GetCache<object, object>(_cache.Name);
TestOperation(() => cache.Put(key, key), gridIdx, "Put");
// Verify against real Affinity.
Assert.AreEqual(gridIdx, GetPrimaryNodeIdx(key), "Actual primary node is different");
}
[Test]
[TestCase("00000000-0000-0000-0000-000000000000", 0)]
[TestCase("0cb85a41-bd0d-405b-8f34-f515e8aabc39", 0)]
[TestCase("b4addd17-218c-4054-a5fa-03c88f5ee71c", 0)]
[TestCase("2611e3d2-618d-43b9-a318-2f5039f82568", 1)]
[TestCase("1dd8bfae-29b8-4949-aa99-7c9bfabe2566", 2)]
public void CachePut_GuidKey_RequestIsRoutedToPrimaryNode(string keyString, int gridIdx)
{
var key = Guid.Parse(keyString);
var cache = Client.GetCache<object, object>(_cache.Name);
TestOperation(() => cache.Put(key, key), gridIdx, "Put");
// Verify against real Affinity.
Assert.AreEqual(gridIdx, GetPrimaryNodeIdx(key));
}
[Test]
[TestCase("2015-01-01T00:00:00.0000000Z", 2)]
[TestCase("2016-02-02T00:00:00.0000000Z", 2)]
[TestCase("2017-03-03T00:00:00.0000000Z", 0)]
public void CachePut_DateTimeKey_RequestIsRoutedToPrimaryNode(string keyString, int gridIdx)
{
var key = DateTime.Parse(keyString, CultureInfo.InvariantCulture).ToUniversalTime();
var cache = Client.GetCache<object, object>(_cache.Name);
TestOperation(() => cache.Put(key, key), gridIdx, "Put");
// Verify against real Affinity.
Assert.AreEqual(gridIdx, GetPrimaryNodeIdx(key));
}
[Test]
[TestCase(1, 1)]
[TestCase(2, 0)]
public void CachePut_IntPtrKeyKey_RequestIsRoutedToPrimaryNode(int keyInt, int gridIdx)
{
var key = new IntPtr(keyInt);
var cache = Client.GetCache<object, object>(_cache.Name);
TestOperation(() => cache.Put(key, key), gridIdx, "Put");
// Verify against real Affinity.
Assert.AreEqual(gridIdx, GetPrimaryNodeIdx(key));
}
[Test]
[TestCase(1, 1)]
[TestCase(2, 0)]
public void CachePut_UIntPtrKeyKey_RequestIsRoutedToPrimaryNode(int keyInt, int gridIdx)
{
var key = new UIntPtr((uint) keyInt);
var cache = Client.GetCache<object, object>(_cache.Name);
TestOperation(() => cache.Put(key, key), gridIdx, "Put");
// Verify against real Affinity.
Assert.AreEqual(gridIdx, GetPrimaryNodeIdx(key));
}
[Test]
[TestCase(1, 1)]
[TestCase(2, 0)]
[TestCase(3, 0)]
[TestCase(4, 1)]
[TestCase(5, 1)]
[TestCase(6, 2)]
public void DataStreamer_PrimitiveKeyType_RequestIsRoutedToPrimaryNode(int key, int gridIdx)
{
using (var streamer = Client.GetDataStreamer<int, int>(_cache.Name))
{
streamer.Add(key, key);
}
Assert.AreEqual(gridIdx, GetClientRequestGridIndex("Start", RequestNamePrefixStreamer));
}
[Test]
[TestCase("default-grp-partitioned", null, CacheMode.Partitioned, 0)]
[TestCase("default-grp-replicated", null, CacheMode.Replicated, 2)]
[TestCase("custom-grp-partitioned", "testAtomicLong", CacheMode.Partitioned, 1)]
[TestCase("custom-grp-replicated", "testAtomicLong", CacheMode.Replicated, 0)]
public void AtomicLong_RequestIsRoutedToPrimaryNode(
string name, string groupName, CacheMode cacheMode, int gridIdx)
{
var cfg = new AtomicClientConfiguration
{
GroupName = groupName,
CacheMode = cacheMode
};
var atomicLong = Client.GetAtomicLong(name, cfg, 1, true);
// Warm up.
atomicLong.Read();
ClearLoggers();
// Test.
atomicLong.Read();
Assert.AreEqual(gridIdx, GetClientRequestGridIndex("ValueGet", "datastructures.ClientAtomicLong"));
}
[Test]
[TestCase("default-grp-partitioned-set", null, CacheMode.Partitioned, 1, 1)]
[TestCase("default-grp-partitioned-set", null, CacheMode.Partitioned, 2, 0)]
[TestCase("default-grp-partitioned-set", null, CacheMode.Partitioned, 3, 0)]
[TestCase("default-grp-partitioned-set", null, CacheMode.Partitioned, 4, 1)]
[TestCase("custom-grp-partitioned-set", "testIgniteSet1", CacheMode.Partitioned, 1, 1)]
[TestCase("custom-grp-partitioned-set", "testIgniteSet1", CacheMode.Partitioned, 3, 0)]
[TestCase("custom-grp-replicated-set", "testIgniteSet2", CacheMode.Replicated, 1, 1)]
[TestCase("custom-grp-replicated-set", "testIgniteSet2", CacheMode.Replicated, 3, 1)]
public void IgniteSet_RequestIsRoutedToPrimaryNode(
string name, string groupName, CacheMode cacheMode, int item, int gridIdx)
{
var cfg = new CollectionClientConfiguration
{
GroupName = groupName,
CacheMode = cacheMode,
Backups = 1
};
var igniteSet = Client.GetIgniteSet<int>(name, cfg);
// Warm up.
igniteSet.Add(0);
ClearLoggers();
// Test.
igniteSet.Add(item);
Assert.AreEqual(gridIdx, GetClientRequestGridIndex("ValueAdd", "datastructures.ClientIgniteSet"));
}
[Test]
[TestCase("default-grp-partitioned-set-2", null, CacheMode.Partitioned, 1, 0)]
[TestCase("default-grp-partitioned-set-2", null, CacheMode.Partitioned, 2, 0)]
[TestCase("default-grp-partitioned-set-2", null, CacheMode.Partitioned, 3, 0)]
[TestCase("default-grp-partitioned-set-2", null, CacheMode.Partitioned, 4, 0)]
[TestCase("custom-grp-partitioned-set-2", "testIgniteSetColocated1", CacheMode.Partitioned, 1, 1)]
[TestCase("custom-grp-partitioned-set-2", "testIgniteSetColocated1", CacheMode.Partitioned, 2, 1)]
[TestCase("custom-grp-partitioned-set-2", "testIgniteSetColocated1", CacheMode.Partitioned, 3, 1)]
[TestCase("custom-grp-replicated-set-2", "testIgniteSetColocated2", CacheMode.Replicated, 1, 1)]
[TestCase("custom-grp-replicated-set-2", "testIgniteSetColocated2", CacheMode.Replicated, 2, 1)]
[TestCase("custom-grp-replicated-set-2", "testIgniteSetColocated2", CacheMode.Replicated, 3, 1)]
public void IgniteSetColocated_RequestIsRoutedToPrimaryNode(
string name, string groupName, CacheMode cacheMode, int item, int gridIdx)
{
var cfg = new CollectionClientConfiguration
{
GroupName = groupName,
CacheMode = cacheMode,
Colocated = true,
Backups = 1
};
var igniteSet = Client.GetIgniteSet<int>(name, cfg);
// Warm up.
igniteSet.Add(0);
ClearLoggers();
// Test.
igniteSet.Add(1);
Assert.AreEqual(gridIdx, GetClientRequestGridIndex("ValueAdd", "datastructures.ClientIgniteSet"));
}
protected override IgniteClientConfiguration GetClientConfiguration()
{
var cfg = base.GetClientConfiguration();
cfg.EnablePartitionAwareness = true;
cfg.Endpoints.Add(string.Format("{0}:{1}", IPAddress.Loopback, IgniteClientConfiguration.DefaultPort + 1));
cfg.Endpoints.Add(string.Format("{0}:{1}", IPAddress.Loopback, IgniteClientConfiguration.DefaultPort + 2));
return cfg;
}
private int GetClientRequestGridIndex(string message = null, string prefix = null)
{
message = message ?? "Get";
try
{
for (var i = 0; i < ServerCount; i++)
{
var requests = GetServerRequestNames(i, prefix ?? RequestNamePrefixCache);
if (requests.Contains(message))
{
return i;
}
}
return -1;
}
finally
{
ClearLoggers();
}
}
private void TestOperation(Action action, int expectedGridIdx, string message = null)
{
InitTestData();
ClearLoggers();
action();
Assert.AreEqual(expectedGridIdx, GetClientRequestGridIndex(message));
}
private void TestAsyncOperation<T>(Func<T> action, int expectedGridIdx, string message = null)
where T : Task
{
ClearLoggers();
action().Wait();
Assert.AreEqual(expectedGridIdx, GetClientRequestGridIndex(message));
}
private void InitTestData()
{
_cache.PutAll(Enumerable.Range(1, 100).ToDictionary(x => x, x => x));
}
private int GetPrimaryNodeIdx<T>(T key)
{
var idx = 0;
// GetAll is not ordered - sort the same way as _loggers.
var ignites = Ignition.GetAll().OrderBy(i => i.Name);
foreach (var ignite in ignites)
{
var aff = ignite.GetAffinity(_cache.Name);
var localNode = ignite.GetCluster().GetLocalNode();
if (aff.IsPrimary(localNode, key))
{
return idx;
}
idx++;
}
throw new InvalidOperationException("Can't determine primary node");
}
}
}