blob: cd8f139d2bb31d77df03de35a0a1452f95841d5c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Tests.Cache
{
using System;
using System.Collections.Generic;
using System.Linq;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Affinity;
using Apache.Ignite.Core.Cache.Affinity.Rendezvous;
using Apache.Ignite.Core.Cache.Configuration;
using NUnit.Framework;
/// <summary>
/// Tests partition loss management functionality:
/// <see cref="PartitionLossPolicy"/>, <see cref="IIgnite.ResetLostPartitions(IEnumerable{string})"/>,
/// <see cref="ICache{TK,TV}.GetLostPartitions"/>, <see cref="ICache{TK,TV}.WithPartitionRecover"/>.
/// </summary>
public class PartitionLossTest
{
/** */
private const string CacheName = "lossTestCache";
/** */
private const string WaitForRebalanceTask = "org.apache.ignite.platform.PlatformWaitForRebalanceTask";
/** */
private const int TimeoutMs = 7000;
/// <summary>
/// Fixture set up.
/// </summary>
[SetUp]
public void SetUp()
{
Ignition.Start(TestUtils.GetTestConfiguration());
}
/// <summary>
/// Fixture tear down.
/// </summary>
[TearDown]
public void TearDown()
{
Ignition.StopAll(true);
}
/// <summary>
/// Tests the ReadOnlySafe mode.
/// </summary>
[Test]
public void TestReadOnlySafe()
{
TestPartitionLoss(PartitionLossPolicy.ReadOnlySafe, false, true);
}
/// <summary>
/// Tests the ReadWriteSafe mode.
/// </summary>
[Test]
public void TestReadWriteSafe()
{
TestPartitionLoss(PartitionLossPolicy.ReadWriteSafe, true, true);
}
/// <summary>
/// Tests the ReadOnlyAll mode.
/// </summary>
[Test]
public void TestReadOnlyAll()
{
TestPartitionLoss(PartitionLossPolicy.ReadOnlyAll, false, true);
}
/// <summary>
/// Tests the ReadWriteAll mode.
/// </summary>
[Test]
public void TestReadWriteAll()
{
TestPartitionLoss(PartitionLossPolicy.ReadWriteAll, true, true);
}
/// <summary>
/// Tests the Ignore mode.
/// </summary>
[Test]
public void TestIgnoreLoss()
{
var ignite = Ignition.GetIgnite();
var cache = CreateCache(PartitionLossPolicy.Ignore, ignite);
var lostPart = PrepareTopology();
Assert.IsEmpty(cache.GetLostPartitions());
cache[lostPart] = lostPart;
Assert.AreEqual(lostPart, cache[lostPart]);
}
/// <summary>
/// Tests the partition loss.
/// </summary>
private static void TestPartitionLoss(PartitionLossPolicy policy, bool canWrite, bool safe)
{
var ignite = Ignition.GetIgnite();
var cache = CreateCache(policy, ignite);
// Loose data and verify lost partition.
var lostPart = PrepareTopology();
TestUtils.WaitForTrueCondition(() => cache.GetLostPartitions().Any(), TimeoutMs);
var lostParts = cache.GetLostPartitions();
Assert.IsTrue(lostParts.Contains(lostPart));
// Check cache operations.
foreach (var part in lostParts)
{
VerifyCacheOperations(cache, part, canWrite, safe);
// Check reads are possible from a cache in recovery mode.
var recoverCache = cache.WithPartitionRecover();
int unused;
Assert.IsFalse(recoverCache.TryGet(part, out unused));
}
// Reset and verify. Test different ResetLostPartitions overloads.
if (canWrite)
{
ignite.ResetLostPartitions(CacheName);
}
else
{
ignite.ResetLostPartitions(new List<string> {CacheName, "foo"});
}
Assert.IsEmpty(cache.GetLostPartitions());
}
/// <summary>
/// Verifies the cache operations.
/// </summary>
private static void VerifyCacheOperations(ICache<int, int> cache, int part, bool canWrite, bool safe)
{
if (safe)
{
int unused;
var ex = Assert.Throws<CacheException>(() => cache.TryGet(part, out unused));
Assert.AreEqual(string.Format(
"class org.apache.ignite.internal.processors.cache.CacheInvalidStateException" +
": Failed to execute the cache operation (all partition owners have left the grid, " +
"partition data has been lost) [cacheName={0}, partition={1}," +
" key=UserKeyCacheObjectImpl [part={1}, val={1}, hasValBytes=false]]",
CacheName, part), ex.Message);
}
else
{
int unused;
Assert.IsFalse(cache.TryGet(part, out unused));
}
if (canWrite)
{
if (safe)
{
var ex = Assert.Throws<CacheException>(() => cache.Put(part, part));
Assert.AreEqual(string.Format(
"class org.apache.ignite.internal.processors.cache.CacheInvalidStateException: " +
"Failed to execute the cache operation (all partition owners have left the grid, " +
"partition data has been lost) [cacheName={0}, partition={1}, key={1}]",
CacheName, part), ex.Message);
}
else
{
cache[part] = part;
Assert.AreEqual(part, cache[part]);
}
}
else
{
var ex = Assert.Throws<CacheException>(() => cache.Put(part, part));
Assert.AreEqual(string.Format(
"class org.apache.ignite.internal.processors.cache.CacheInvalidStateException: " +
"Failed to write to cache (cache is moved to a read-only state): {0}",
CacheName), ex.Message);
}
}
/// <summary>
/// Creates the cache.
/// </summary>
private static ICache<int, int> CreateCache(PartitionLossPolicy policy, IIgnite ignite)
{
return ignite.CreateCache<int, int>(new CacheConfiguration(CacheName)
{
CacheMode = CacheMode.Partitioned,
Backups = 0,
WriteSynchronizationMode = CacheWriteSynchronizationMode.FullSync,
PartitionLossPolicy = policy,
RebalanceDelay = TimeSpan.Zero,
RebalanceMode = CacheRebalanceMode.Sync,
RebalanceThrottle = TimeSpan.Zero,
AffinityFunction = new RendezvousAffinityFunction
{
ExcludeNeighbors = false,
Partitions = 32
}
});
}
/// <summary>
/// Prepares the topology: starts a new node and stops it after rebalance to ensure data loss.
/// </summary>
/// <returns>Lost partition id.</returns>
private static int PrepareTopology()
{
using (var ignite = Ignition.Start(TestUtils.GetTestConfiguration(name: "ignite-2")))
{
var cache = ignite.GetCache<int, int>(CacheName);
var affinity = ignite.GetAffinity(CacheName);
var keys = Enumerable.Range(1, affinity.Partitions).ToArray();
cache.PutAll(keys.ToDictionary(x => x, x => x));
cache.Rebalance();
// Wait for rebalance to complete.
var node = ignite.GetCluster().GetLocalNode();
Func<int, bool> isPrimary = x => affinity.IsPrimary(node, x);
TestUtils.WaitForTrueCondition(() => keys.Any(isPrimary), TimeoutMs);
var expectedTopVer = new AffinityTopologyVersion(2, 1);
Assert.IsTrue(ignite.GetCompute().ExecuteJavaTask<bool>(
WaitForRebalanceTask,
new object[] { CacheName, expectedTopVer.Version, expectedTopVer.MinorVersion, (long)TimeoutMs }));
return keys.First(isPrimary);
}
}
}
}