blob: 516016f2cd1e527c24f4657e959521ceca9b2e85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Tests;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Channels;
using System.Threading.Tasks;
using Ignite.Compute;
using Ignite.Table;
using Ignite.Transactions;
using Internal.Proto;
using Internal.Transactions;
using NUnit.Framework;
/// <summary>
/// Tests partition awareness.
/// </summary>
public class PartitionAwarenessTests
{
private static readonly object[] KeyNodeCases =
{
new object[] { 0, 1 },
new object[] { 1, 2 },
new object[] { 3, 1 },
new object[] { 4, 2 },
new object[] { 5, 2 },
new object[] { 8, 2 },
new object[] { int.MaxValue, 2 },
new object[] { int.MaxValue - 1, 2 },
new object[] { int.MinValue, 2 }
};
private FakeServer _server1 = null!;
private FakeServer _server2 = null!;
[SetUp]
public void SetUp()
{
_server1 = new FakeServer(nodeName: "srv1");
_server2 = new FakeServer(nodeName: "srv2");
var assignment = new[] { _server1.Node.Name, _server2.Node.Name };
var assignmentTimestamp = DateTime.UtcNow.AddDays(-1).Ticks; // Old assignment.
foreach (var server in new[] { _server1, _server2 })
{
server.PartitionAssignment = assignment;
server.PartitionAssignmentTimestamp = assignmentTimestamp;
}
}
[TearDown]
public void TearDown()
{
_server1.Dispose();
_server2.Dispose();
}
[Test]
public async Task TestPutRoutesRequestToPrimaryNode()
{
using var client = await GetClient();
var recordView = (await client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
// Warm up.
await recordView.UpsertAsync(null, 1);
// Check.
await AssertOpOnNode(async tx => await recordView.UpsertAsync(tx, 1), ClientOp.TupleUpsert, _server2, _server1);
await AssertOpOnNode(async tx => await recordView.UpsertAsync(tx, 3), ClientOp.TupleUpsert, _server1, _server2);
await AssertOpOnNode(async tx => await recordView.UpsertAsync(tx, 4), ClientOp.TupleUpsert, _server2, _server1);
await AssertOpOnNode(async tx => await recordView.UpsertAsync(tx, 7), ClientOp.TupleUpsert, _server1, _server2);
}
[Test]
public async Task TestPutWithTxUsesTxNode()
{
using var client = await GetClient();
var recordView = (await client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
var txServer = _server1;
txServer.ClearOps();
var tx = await client.Transactions.BeginAsync();
await TestUtils.ForceLazyTxStart(tx, client, PreferredNode.FromName(_server1.Node.Name));
Assert.AreEqual(ClientOp.TxBegin, txServer.ClientOps.Single());
for (int i = 0; i < 10; i++)
{
await recordView.UpsertAsync(tx, i);
}
Assert.AreEqual(Enumerable.Repeat(ClientOp.TupleUpsert, 10), txServer.ClientOps.TakeLast(10));
}
[Test]
public async Task TestClientReceivesPartitionAssignmentUpdates() =>
await TestClientReceivesPartitionAssignmentUpdates((view, tx) => view.UpsertAsync(tx, 1), ClientOp.TupleUpsert);
[Test]
public async Task TestDataStreamerReceivesPartitionAssignmentUpdates() =>
await TestClientReceivesPartitionAssignmentUpdates(
(view, _) => view.StreamDataAsync(new[] { 1 }.ToAsyncEnumerable()),
ClientOp.StreamerBatchSend);
[Test]
public async Task TestDataStreamerWithReceiverReceivesPartitionAssignmentUpdates() =>
await TestClientReceivesPartitionAssignmentUpdates(
(view, _) => view.StreamDataAsync(
new[] { 1 }.ToAsyncEnumerable(),
keySelector: x => x,
payloadSelector: x => x.ToString(),
new ReceiverDescriptor<object?>("x"),
null),
ClientOp.StreamerWithReceiverBatchSend);
[Test]
public async Task TestDataStreamerReceivesPartitionAssignmentUpdatesWhileStreaming([Values(true, false)] bool withReceiver)
{
var clientOp = withReceiver ? ClientOp.StreamerWithReceiverBatchSend : ClientOp.StreamerBatchSend;
var producer = Channel.CreateUnbounded<int>();
using var client = await GetClient();
var recordView = (await client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
var options = new DataStreamerOptions { PageSize = 1 };
var data = producer.Reader.ReadAllAsync();
var receiverDescriptor = new ReceiverDescriptor<object?>("x");
var streamerTask = withReceiver
? recordView.StreamDataAsync(data, x => x, x => x.ToString(), receiverDescriptor, null, options)
: recordView.StreamDataAsync(data, options);
Func<ITransaction?, Task> action = async _ =>
{
await producer.Writer.WriteAsync(1);
TestUtils.WaitForCondition(
() => new[] { _server1, _server2 }.SelectMany(x => x.ClientOps).Contains(clientOp));
};
// Check default assignment.
await recordView.UpsertAsync(null, 1);
await AssertOpOnNode(action, clientOp, _server2);
// Update assignment - first request receives update flag.
ReversePartitionAssignment();
await client.Tables.GetTablesAsync();
// Second request loads and uses new assignment.
await AssertOpOnNode(action, clientOp, _server1, allowExtraOps: true);
// End streaming.
producer.Writer.Complete();
await streamerTask;
}
[Test]
[TestCaseSource(nameof(KeyNodeCases))]
public async Task TestAllRecordBinaryViewOperations(int keyId, int node)
{
using var client = await GetClient();
var recordView = (await client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.RecordBinaryView;
// Warm up (retrieve assignment).
var key = new IgniteTuple { ["ID"] = keyId };
await recordView.UpsertAsync(null, key);
// Single-key operations.
var expectedNode = node == 1 ? _server1 : _server2;
await AssertOpOnNode(tx => recordView.GetAsync(tx, key), ClientOp.TupleGet, expectedNode);
await AssertOpOnNode(tx => recordView.GetAndDeleteAsync(tx, key), ClientOp.TupleGetAndDelete, expectedNode);
await AssertOpOnNode(tx => recordView.GetAndReplaceAsync(tx, key), ClientOp.TupleGetAndReplace, expectedNode);
await AssertOpOnNode(tx => recordView.GetAndUpsertAsync(tx, key), ClientOp.TupleGetAndUpsert, expectedNode);
await AssertOpOnNode(tx => recordView.UpsertAsync(tx, key), ClientOp.TupleUpsert, expectedNode);
await AssertOpOnNode(tx => recordView.InsertAsync(tx, key), ClientOp.TupleInsert, expectedNode);
await AssertOpOnNode(tx => recordView.ReplaceAsync(tx, key), ClientOp.TupleReplace, expectedNode);
await AssertOpOnNode(tx => recordView.ReplaceAsync(tx, key, key), ClientOp.TupleReplaceExact, expectedNode);
await AssertOpOnNode(tx => recordView.DeleteAsync(tx, key), ClientOp.TupleDelete, expectedNode);
await AssertOpOnNode(tx => recordView.DeleteExactAsync(tx, key), ClientOp.TupleDeleteExact, expectedNode);
await AssertOpOnNode(_ => recordView.StreamDataAsync(new[] { key }.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, new IgniteTuple { ["ID"] = keyId - 1 }, new IgniteTuple { ["ID"] = keyId + 1 } };
await AssertOpOnNode(tx => recordView.GetAllAsync(tx, keys), ClientOp.TupleGetAll, expectedNode);
await AssertOpOnNode(tx => recordView.InsertAllAsync(tx, keys), ClientOp.TupleInsertAll, expectedNode);
await AssertOpOnNode(tx => recordView.UpsertAllAsync(tx, keys), ClientOp.TupleUpsertAll, expectedNode);
await AssertOpOnNode(tx => recordView.DeleteAllAsync(tx, keys), ClientOp.TupleDeleteAll, expectedNode);
await AssertOpOnNode(tx => recordView.DeleteAllExactAsync(tx, keys), ClientOp.TupleDeleteAllExact, expectedNode);
}
[Test]
[TestCaseSource(nameof(KeyNodeCases))]
public async Task TestAllRecordViewOperations(int key, int node)
{
using var client = await GetClient();
var recordView = (await client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
// Warm up (retrieve assignment).
await recordView.UpsertAsync(null, 1);
// Single-key operations.
var expectedNode = node == 1 ? _server1 : _server2;
await AssertOpOnNode(tx => recordView.GetAsync(tx, key), ClientOp.TupleGet, expectedNode);
await AssertOpOnNode(tx => recordView.GetAndDeleteAsync(tx, key), ClientOp.TupleGetAndDelete, expectedNode);
await AssertOpOnNode(tx => recordView.GetAndReplaceAsync(tx, key), ClientOp.TupleGetAndReplace, expectedNode);
await AssertOpOnNode(tx => recordView.GetAndUpsertAsync(tx, key), ClientOp.TupleGetAndUpsert, expectedNode);
await AssertOpOnNode(tx => recordView.UpsertAsync(tx, key), ClientOp.TupleUpsert, expectedNode);
await AssertOpOnNode(tx => recordView.InsertAsync(tx, key), ClientOp.TupleInsert, expectedNode);
await AssertOpOnNode(tx => recordView.ReplaceAsync(tx, key), ClientOp.TupleReplace, expectedNode);
await AssertOpOnNode(tx => recordView.ReplaceAsync(tx, key, key), ClientOp.TupleReplaceExact, expectedNode);
await AssertOpOnNode(tx => recordView.DeleteAsync(tx, key), ClientOp.TupleDelete, expectedNode);
await AssertOpOnNode(tx => recordView.DeleteExactAsync(tx, key), ClientOp.TupleDeleteExact, expectedNode);
await AssertOpOnNode(_ => recordView.StreamDataAsync(new[] { key }.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, key - 1, key + 1 };
await AssertOpOnNode(tx => recordView.GetAllAsync(tx, keys), ClientOp.TupleGetAll, expectedNode);
await AssertOpOnNode(tx => recordView.InsertAllAsync(tx, keys), ClientOp.TupleInsertAll, expectedNode);
await AssertOpOnNode(tx => recordView.UpsertAllAsync(tx, keys), ClientOp.TupleUpsertAll, expectedNode);
await AssertOpOnNode(tx => recordView.DeleteAllAsync(tx, keys), ClientOp.TupleDeleteAll, expectedNode);
await AssertOpOnNode(tx => recordView.DeleteAllExactAsync(tx, keys), ClientOp.TupleDeleteAllExact, expectedNode);
}
[Test]
[TestCaseSource(nameof(KeyNodeCases))]
public async Task TestAllKeyValueBinaryViewOperations(int keyId, int node)
{
using var client = await GetClient();
var kvView = (await client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.KeyValueBinaryView;
// Warm up (retrieve assignment).
var key = new IgniteTuple { ["ID"] = keyId };
var val = new IgniteTuple();
await kvView.PutAsync(null, key, val);
// Single-key operations.
var expectedNode = node == 1 ? _server1 : _server2;
await AssertOpOnNode(tx => kvView.GetAsync(tx, key), ClientOp.TupleGet, expectedNode);
await AssertOpOnNode(tx => kvView.GetAndRemoveAsync(tx, key), ClientOp.TupleGetAndDelete, expectedNode);
await AssertOpOnNode(tx => kvView.GetAndReplaceAsync(tx, key, val), ClientOp.TupleGetAndReplace, expectedNode);
await AssertOpOnNode(tx => kvView.GetAndPutAsync(tx, key, val), ClientOp.TupleGetAndUpsert, expectedNode);
await AssertOpOnNode(tx => kvView.PutAsync(tx, key, val), ClientOp.TupleUpsert, expectedNode);
await AssertOpOnNode(tx => kvView.PutIfAbsentAsync(tx, key, val), ClientOp.TupleInsert, expectedNode);
await AssertOpOnNode(tx => kvView.ReplaceAsync(tx, key, val), ClientOp.TupleReplace, expectedNode);
await AssertOpOnNode(tx => kvView.ReplaceAsync(tx, key, val, val), ClientOp.TupleReplaceExact, expectedNode);
await AssertOpOnNode(tx => kvView.RemoveAsync(tx, key), ClientOp.TupleDelete, expectedNode);
await AssertOpOnNode(tx => kvView.RemoveAsync(tx, key, val), ClientOp.TupleDeleteExact, expectedNode);
await AssertOpOnNode(tx => kvView.ContainsAsync(tx, key), ClientOp.TupleContainsKey, expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, new IgniteTuple { ["ID"] = keyId - 1 }, new IgniteTuple { ["ID"] = keyId + 1 } };
var pairs = keys.ToDictionary(x => (IIgniteTuple)x, _ => (IIgniteTuple)val);
await AssertOpOnNode(tx => kvView.GetAllAsync(tx, keys), ClientOp.TupleGetAll, expectedNode);
await AssertOpOnNode(tx => kvView.PutAllAsync(tx, pairs), ClientOp.TupleUpsertAll, expectedNode);
await AssertOpOnNode(tx => kvView.RemoveAllAsync(tx, keys), ClientOp.TupleDeleteAll, expectedNode);
await AssertOpOnNode(tx => kvView.RemoveAllAsync(tx, pairs), ClientOp.TupleDeleteAllExact, expectedNode);
await AssertOpOnNode(_ => kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
}
[Test]
[TestCaseSource(nameof(KeyNodeCases))]
public async Task TestAllKeyValueViewOperations(int key, int node)
{
using var client = await GetClient();
var kvView = (await client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetKeyValueView<int, int>();
// Warm up (retrieve assignment).
var val = 0;
await kvView.PutAsync(null, 1, val);
// Single-key operations.
var expectedNode = node == 1 ? _server1 : _server2;
await AssertOpOnNode(tx => kvView.GetAsync(tx, key), ClientOp.TupleGet, expectedNode);
await AssertOpOnNode(tx => kvView.GetAndRemoveAsync(tx, key), ClientOp.TupleGetAndDelete, expectedNode);
await AssertOpOnNode(tx => kvView.GetAndReplaceAsync(tx, key, val), ClientOp.TupleGetAndReplace, expectedNode);
await AssertOpOnNode(tx => kvView.GetAndPutAsync(tx, key, val), ClientOp.TupleGetAndUpsert, expectedNode);
await AssertOpOnNode(tx => kvView.PutAsync(tx, key, val), ClientOp.TupleUpsert, expectedNode);
await AssertOpOnNode(tx => kvView.PutIfAbsentAsync(tx, key, val), ClientOp.TupleInsert, expectedNode);
await AssertOpOnNode(tx => kvView.ReplaceAsync(tx, key, val), ClientOp.TupleReplace, expectedNode);
await AssertOpOnNode(tx => kvView.ReplaceAsync(tx, key, val, val), ClientOp.TupleReplaceExact, expectedNode);
await AssertOpOnNode(tx => kvView.RemoveAsync(tx, key), ClientOp.TupleDelete, expectedNode);
await AssertOpOnNode(tx => kvView.RemoveAsync(tx, key, val), ClientOp.TupleDeleteExact, expectedNode);
await AssertOpOnNode(tx => kvView.ContainsAsync(tx, key), ClientOp.TupleContainsKey, expectedNode);
await AssertOpOnNode(
_ => kvView.StreamDataAsync(new[] { new KeyValuePair<int, int>(key, val) }.ToAsyncEnumerable()),
ClientOp.StreamerBatchSend,
expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, key - 1, key + 1 };
var pairs = keys.ToDictionary(x => x, _ => val);
await AssertOpOnNode(tx => kvView.GetAllAsync(tx, keys), ClientOp.TupleGetAll, expectedNode);
await AssertOpOnNode(tx => kvView.PutAllAsync(tx, pairs), ClientOp.TupleUpsertAll, expectedNode);
await AssertOpOnNode(tx => kvView.RemoveAllAsync(tx, keys), ClientOp.TupleDeleteAll, expectedNode);
await AssertOpOnNode(tx => kvView.RemoveAllAsync(tx, pairs), ClientOp.TupleDeleteAllExact, expectedNode);
}
[Test]
public async Task TestCompositeKey()
{
using var client = await GetClient();
var view = (await client.Tables.GetTableAsync(FakeServer.CompositeKeyTableName))!.GetRecordView<CompositeKey>();
await view.UpsertAsync(null, new CompositeKey("1", Guid.Empty)); // Warm up.
await Test("1", Guid.Empty, _server1);
await Test("1", Guid.Parse("b0000000-0000-0000-0000-000000000000"), _server2);
await Test("c", Guid.Empty, _server2);
await Test("c", Guid.Parse("b0000000-0000-0000-0000-000000000000"), _server1);
async Task Test(string idStr, Guid idGuid, FakeServer node) =>
await AssertOpOnNode(tx => view.UpsertAsync(tx, new CompositeKey(idStr, idGuid)), ClientOp.TupleUpsert, node);
}
[Test]
public async Task TestCustomColocationKey()
{
using var client = await GetClient();
var view = (await client.Tables.GetTableAsync(FakeServer.CustomColocationKeyTableName))!.GetRecordView<CompositeKey>();
// Warm up.
await view.UpsertAsync(null, new CompositeKey("1", Guid.Empty));
// Both columns are part of key, but only string column is colocation key, so random Guid does not affect the hash.
await Test("1", Guid.NewGuid(), _server2);
await Test("c", Guid.NewGuid(), _server1);
async Task Test(string idStr, Guid idGuid, FakeServer node) =>
await AssertOpOnNode(tx => view.UpsertAsync(tx, new CompositeKey(idStr, idGuid)), ClientOp.TupleUpsert, node);
}
[Test]
[TestCaseSource(nameof(KeyNodeCases))]
public async Task TestExecuteColocatedTupleKeyRoutesRequestToPrimaryNode(int keyId, int node)
{
using var client = await GetClient();
var expectedNode = node == 1 ? _server1 : _server2;
var key = new IgniteTuple { ["ID"] = keyId };
// Warm up.
var jobTarget = JobTarget.Colocated(FakeServer.ExistingTableName, key);
var jobDescriptor = new JobDescriptor<object?, object?>("job");
await client.Compute.SubmitAsync(jobTarget, jobDescriptor, null);
await AssertOpOnNode(
_ => client.Compute.SubmitAsync(jobTarget, jobDescriptor, null),
ClientOp.ComputeExecuteColocated,
expectedNode);
}
[Test]
[TestCaseSource(nameof(KeyNodeCases))]
public async Task TestExecuteColocatedObjectKeyRoutesRequestToPrimaryNode(int keyId, int node)
{
using var client = await GetClient();
var expectedNode = node == 1 ? _server1 : _server2;
var key = new SimpleKey(keyId);
var jobTarget = JobTarget.Colocated(FakeServer.ExistingTableName, key);
var jobDescriptor = new JobDescriptor<object?, object?>("job");
// Warm up.
await client.Compute.SubmitAsync(jobTarget, jobDescriptor, null);
await AssertOpOnNode(
_ => client.Compute.SubmitAsync(jobTarget, jobDescriptor, null),
ClientOp.ComputeExecuteColocated,
expectedNode);
}
[Test]
public async Task TestOldAssignmentIsIgnored()
{
using var client = await GetClient();
var recordView = (await client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
// Check default assignment.
await recordView.UpsertAsync(null, 1);
await AssertOpOnNode(tx => recordView.UpsertAsync(tx, 1), ClientOp.TupleUpsert, _server2);
// One server has old assignment
_server1.PartitionAssignment = _server1.PartitionAssignment.Reverse().ToArray();
_server1.PartitionAssignmentTimestamp -= 1000;
// Multiple requests to receive timestamp from all servers.
for (int i = 0; i < 10; i++)
{
await client.Tables.GetTablesAsync();
}
// Check that assignment has not changed - update with old timestamp was ignored.
_server1.ClearOps();
_server2.ClearOps();
await recordView.UpsertAsync(null, 1);
await AssertOpOnNode(tx => recordView.UpsertAsync(tx, 1), ClientOp.TupleUpsert, _server2);
}
private static async Task AssertOpOnNode(
Func<ITransaction?, Task> action,
ClientOp op,
FakeServer node,
FakeServer? node2 = null,
bool allowExtraOps = false)
{
await AssertOpOnNodeInner(action, op, node, node2, allowExtraOps, withTx: false);
if (op != ClientOp.StreamerBatchSend && op != ClientOp.ComputeExecuteColocated && op != ClientOp.StreamerWithReceiverBatchSend)
{
await AssertOpOnNodeInner(action, op, node, node2, allowExtraOps, withTx: true);
}
}
private static async Task AssertOpOnNodeInner(
Func<ITransaction?, Task> action,
ClientOp op,
FakeServer node,
FakeServer? node2 = null,
bool allowExtraOps = false,
bool withTx = false)
{
node.ClearOps();
node2?.ClearOps();
ITransaction? tx = withTx ? new LazyTransaction(default, 0) : null;
await action(tx);
if (allowExtraOps)
{
CollectionAssert.Contains(node.ClientOps, op);
}
else
{
if (withTx)
{
Assert.AreEqual(new[] { ClientOp.TxBegin, op }, node.ClientOps);
}
else
{
Assert.AreEqual(new[] { op }, node.ClientOps);
}
}
if (node2 != null)
{
CollectionAssert.IsEmpty(node2.ClientOps);
}
}
private async Task TestClientReceivesPartitionAssignmentUpdates(Func<IRecordView<int>, ITransaction?, Task> func, ClientOp op)
{
using var client = await GetClient();
var recordView = (await client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
// Check default assignment.
await recordView.UpsertAsync(null, 1);
await AssertOpOnNode(tx => func(recordView, tx), op, _server2);
// Update assignment - first request receives update flag.
ReversePartitionAssignment();
await client.Tables.GetTablesAsync();
// Second request loads and uses new assignment.
await AssertOpOnNode(tx => func(recordView, tx), op, _server1, allowExtraOps: true);
}
private void ReversePartitionAssignment()
{
var assignmentTimestamp = DateTime.UtcNow.Ticks;
foreach (var server in new[] { _server1, _server2 })
{
server.ClearOps();
server.PartitionAssignment = server.PartitionAssignment.Reverse().ToArray();
server.PartitionAssignmentTimestamp = assignmentTimestamp;
}
}
private async Task<IIgniteClient> GetClient()
{
var cfg = new IgniteClientConfiguration
{
Endpoints =
{
"127.0.0.1: " + _server1.Port,
"127.0.0.1: " + _server2.Port
}
};
var client = await IgniteClient.StartAsync(cfg);
client.WaitForConnections(2);
return client;
}
// ReSharper disable NotAccessedPositionalProperty.Local
private record CompositeKey(string IdStr, Guid IdGuid);
private record SimpleKey(int Id);
}