blob: 13a1ac30586c5cf74d629c5c0c8f1d8f7a82a1c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Tests
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Ignite.Compute;
using Ignite.Sql;
using Internal.Buffers;
using Internal.Common;
using Internal.Network;
using Internal.Proto;
using Internal.Proto.BinaryTuple;
using Internal.Proto.MsgPack;
using MessagePack;
using Network;
/// <summary>
/// Fake Ignite server for test purposes.
/// </summary>
public sealed class FakeServer : IgniteServerBase
{
public const string Err = "Err!";
public const string ExistingTableName = "tbl1";
public const string CompositeKeyTableName = "tbl2";
public const string CustomColocationKeyTableName = "tbl3";
public const string GetDetailsJob = "get-details";
private const int ExistingTableId = 1001;
private const int CompositeKeyTableId = 1002;
private const int CustomColocationKeyTableId = 1003;
private readonly Func<RequestContext, bool> _shouldDropConnection;
private readonly ConcurrentQueue<ClientOp>? _ops;
public FakeServer(bool disableOpsTracking)
: this(null, disableOpsTracking: disableOpsTracking)
{
// No-op.
}
public FakeServer(bool disableOpsTracking, string nodeName = "fake-server")
: this(null, nodeName, disableOpsTracking: disableOpsTracking)
{
// No-op.
}
internal FakeServer(
Func<RequestContext, bool>? shouldDropConnection = null,
string nodeName = "fake-server",
bool disableOpsTracking = false)
{
_shouldDropConnection = shouldDropConnection ?? (_ => false);
Node = new ClusterNode("id-" + nodeName, nodeName, (IPEndPoint)Listener.LocalEndPoint!);
PartitionAssignment = new[] { Node.Id };
if (!disableOpsTracking)
{
_ops = new();
}
}
public IClusterNode Node { get; }
public Guid ClusterId { get; set; }
public string ClusterName { get; set; } = "fake-cluster";
public string[] PartitionAssignment { get; set; }
public long PartitionAssignmentTimestamp { get; set; }
public TimeSpan HandshakeDelay { get; set; }
public TimeSpan OperationDelay { get; set; }
public TimeSpan MultiRowOperationDelayPerRow { get; set; }
public TimeSpan HeartbeatDelay { get; set; }
public string LastSql { get; set; } = string.Empty;
public long? LastSqlTimeoutMs { get; set; }
public int? LastSqlPageSize { get; set; }
public long? LastSqlTxId { get; set; }
public Dictionary<string, object?> LastSqlScriptProps { get; private set; } = new();
public long StreamerRowCount { get; set; }
public long DroppedConnectionCount { get; set; }
public bool SendInvalidMagic { get; set; }
public int RequestCount { get; set; }
public long ObservableTimestamp { get; set; }
public long LastClientObservableTimestamp { get; set; }
internal IList<ClientOp> ClientOps => _ops?.ToList() ?? throw new Exception("Ops tracking is disabled");
public async Task<IIgniteClient> ConnectClientAsync(IgniteClientConfiguration? cfg = null)
{
cfg ??= new IgniteClientConfiguration();
cfg.Endpoints.Clear();
cfg.Endpoints.Add(Endpoint);
return await IgniteClient.StartAsync(cfg);
}
public void ClearOps() => _ops?.Clear();
protected override void Handle(Socket handler, CancellationToken cancellationToken)
{
// Read handshake.
using var magic = ReceiveBytes(handler, 4);
var msgSize = ReceiveMessageSize(handler);
using var handshake = ReceiveBytes(handler, msgSize);
// Write handshake response.
handler.Send(SendInvalidMagic ? ProtoCommon.MagicBytes.Reverse().ToArray() : ProtoCommon.MagicBytes);
Thread.Sleep(HandshakeDelay);
using var handshakeBufferWriter = new PooledArrayBuffer();
var handshakeWriter = handshakeBufferWriter.MessageWriter;
// Version.
handshakeWriter.Write(3);
handshakeWriter.Write(0);
handshakeWriter.Write(0);
handshakeWriter.WriteNil(); // Success
handshakeWriter.Write(0); // Idle timeout.
handshakeWriter.Write(Node.Id); // Node id.
handshakeWriter.Write(Node.Name); // Node name (consistent id).
handshakeWriter.Write(ClusterId);
handshakeWriter.Write(ClusterName);
handshakeWriter.Write(ObservableTimestamp);
// Cluster version.
handshakeWriter.Write(1);
handshakeWriter.Write(2);
handshakeWriter.Write(3);
handshakeWriter.Write(4);
handshakeWriter.Write("-abcd");
handshakeWriter.WriteBinaryHeader(0); // Features.
handshakeWriter.Write(0); // Extensions.
var handshakeMem = handshakeBufferWriter.GetWrittenMemory();
handler.Send(new byte[] { 0, 0, 0, (byte)handshakeMem.Length }); // Size.
handler.Send(handshakeMem.Span);
while (!cancellationToken.IsCancellationRequested)
{
msgSize = ReceiveMessageSize(handler);
using var msg = ReceiveBytes(handler, msgSize);
if (OperationDelay > TimeSpan.Zero)
{
Thread.Sleep(OperationDelay);
}
var reader = msg.GetReader();
var opCode = (ClientOp)reader.ReadInt32();
var requestId = reader.ReadInt64();
if (_shouldDropConnection(new RequestContext(++RequestCount, opCode, requestId)))
{
DroppedConnectionCount++;
break;
}
_ops?.Enqueue(opCode);
switch (opCode)
{
case ClientOp.TablesGet:
// Zero tables.
Send(handler, requestId, new byte[] { 0 }.AsMemory());
continue;
case ClientOp.TableGet:
{
var tableName = reader.ReadString();
var tableId = tableName switch
{
ExistingTableName => ExistingTableId,
CompositeKeyTableName => CompositeKeyTableId,
CustomColocationKeyTableName => CustomColocationKeyTableId,
_ => default
};
if (tableId != default)
{
using var arrayBufferWriter = new PooledArrayBuffer();
arrayBufferWriter.MessageWriter.Write(tableId);
Send(handler, requestId, arrayBufferWriter);
continue;
}
break;
}
case ClientOp.SchemasGet:
GetSchemas(reader, handler, requestId);
continue;
case ClientOp.PartitionAssignmentGet:
{
using var arrayBufferWriter = new PooledArrayBuffer();
var writer = new MsgPackWriter(arrayBufferWriter);
writer.Write(PartitionAssignment.Length);
writer.Write(true); // Assignment available.
writer.Write(DateTime.UtcNow.Ticks); // Timestamp
foreach (var nodeId in PartitionAssignment)
{
writer.Write(nodeId);
}
Send(handler, requestId, arrayBufferWriter);
continue;
}
case ClientOp.TupleUpsert:
Send(handler, requestId, ReadOnlyMemory<byte>.Empty);
continue;
case ClientOp.TupleInsert:
case ClientOp.TupleReplace:
case ClientOp.TupleReplaceExact:
case ClientOp.TupleDelete:
case ClientOp.TupleDeleteExact:
case ClientOp.TupleContainsKey:
Send(handler, requestId, new byte[] { 1, MessagePackCode.True }.AsMemory());
continue;
case ClientOp.TupleGet:
case ClientOp.TupleGetAndDelete:
case ClientOp.TupleGetAndReplace:
case ClientOp.TupleGetAndUpsert:
Send(handler, requestId, new byte[] { 1, MessagePackCode.Nil }.AsMemory());
continue;
case ClientOp.TupleGetAll:
case ClientOp.TupleInsertAll:
case ClientOp.TupleUpsertAll:
case ClientOp.TupleDeleteAll:
case ClientOp.TupleDeleteAllExact:
reader.Skip(3);
var count = reader.ReadInt32();
if (MultiRowOperationDelayPerRow > TimeSpan.Zero)
{
Thread.Sleep(MultiRowOperationDelayPerRow * count);
}
Send(handler, requestId, new byte[] { 1, 0 }.AsMemory());
continue;
case ClientOp.TxBegin:
reader.Skip(); // Read only.
LastClientObservableTimestamp = reader.ReadInt64();
Send(handler, requestId, new byte[] { 0 }.AsMemory());
continue;
case ClientOp.ComputeExecute:
case ClientOp.ComputeExecuteColocated:
{
using var pooledArrayBuffer = ComputeExecute(reader, colocated: opCode == ClientOp.ComputeExecuteColocated);
using var resWriter = new PooledArrayBuffer();
var rw = resWriter.MessageWriter;
if (opCode == ClientOp.ComputeExecuteColocated)
{
// Schema version.
rw.Write(1);
}
rw.Write(Guid.NewGuid());
Send(handler, requestId, resWriter);
Send(handler, requestId, pooledArrayBuffer, isNotification: true);
continue;
}
case ClientOp.SqlExec:
SqlExec(handler, requestId, reader);
continue;
case ClientOp.SqlCursorNextPage:
SqlCursorNextPage(handler, requestId);
continue;
case ClientOp.SqlExecScript:
SqlExecScript(reader);
Send(handler, requestId, Array.Empty<byte>());
continue;
case ClientOp.Heartbeat:
Thread.Sleep(HeartbeatDelay);
Send(handler, requestId, Array.Empty<byte>());
continue;
case ClientOp.StreamerBatchSend:
reader.Skip(4);
StreamerRowCount += reader.ReadInt32();
Send(handler, requestId, Array.Empty<byte>());
continue;
}
// Fake error message for any other op code.
using var errWriter = new PooledArrayBuffer();
var w = new MsgPackWriter(errWriter);
w.Write(Guid.Empty);
w.Write(262150);
w.Write("org.foo.bar.BazException");
w.Write(Err);
w.WriteNil(); // Stack trace.
w.WriteNil(); // Error extensions.
Send(handler, requestId, errWriter, isError: true);
}
handler.Disconnect(true);
}
private void Send(Socket socket, long requestId, PooledArrayBuffer writer, bool isError = false, bool isNotification = false)
=> Send(socket, requestId, writer.GetWrittenMemory(), isError, isNotification);
private void Send(Socket socket, long requestId, ReadOnlyMemory<byte> payload, bool isError = false, bool isNotification = false)
{
using var header = new PooledArrayBuffer();
var writer = new MsgPackWriter(header);
writer.Write(requestId);
var flags = (int)ResponseFlags.PartitionAssignmentChanged;
if (isError)
{
flags |= (int)ResponseFlags.Error;
}
if (isNotification)
{
flags |= (int)ResponseFlags.Notification;
}
writer.Write(flags);
writer.Write(PartitionAssignmentTimestamp);
writer.Write(ObservableTimestamp); // Observable timestamp.
var headerMem = header.GetWrittenMemory();
var size = BitConverter.GetBytes(IPAddress.HostToNetworkOrder(headerMem.Length + payload.Length));
socket.Send(size);
socket.Send(headerMem.Span);
if (!payload.IsEmpty)
{
socket.Send(payload.Span);
}
}
private void SqlCursorNextPage(Socket handler, long requestId)
{
using var arrayBufferWriter = new PooledArrayBuffer();
var writer = new MsgPackWriter(arrayBufferWriter);
writer.Write(500); // Page size.
for (int i = 0; i < 500; i++)
{
using var tuple = new BinaryTupleBuilder(1);
tuple.AppendInt(i + 512);
writer.Write(tuple.Build().Span);
}
writer.Write(false); // Has next.
Send(handler, requestId, arrayBufferWriter);
}
private void SqlExec(Socket handler, long requestId, MsgPackReader reader)
{
var props = new Dictionary<string, object?>();
// ReSharper disable RedundantCast (does not build on older SDKs)
var txId = reader.TryReadNil() ? (long?)null : reader.ReadInt64();
props["txId"] = txId;
props["schema"] = reader.TryReadNil() ? null : reader.ReadString();
var pageSize = reader.TryReadNil() ? (int?)null : reader.ReadInt32();
props["pageSize"] = pageSize;
var timeoutMs = reader.TryReadNil() ? (long?)null : reader.ReadInt64();
props["timeoutMs"] = timeoutMs;
props["sessionTimeoutMs"] = reader.TryReadNil() ? (long?)null : reader.ReadInt64();
// ReSharper restore RedundantCast
var propCount = reader.ReadInt32();
var propTuple = new BinaryTupleReader(reader.ReadBinary(), propCount * 4);
for (int i = 0; i < propCount; i++)
{
var idx = i * 4;
var name = propTuple.GetString(idx);
var type = (ColumnType)propTuple.GetInt(idx + 1);
var scale = propTuple.GetInt(idx + 2);
props[name] = propTuple.GetObject(idx + 3, type, scale);
}
var sql = reader.ReadString();
props["sql"] = sql;
if (!reader.TryReadNil())
{
var argCount = reader.ReadInt32();
if (argCount > 0)
{
reader.Skip();
}
}
LastClientObservableTimestamp = reader.ReadInt64();
LastSql = sql;
LastSqlPageSize = pageSize;
LastSqlTimeoutMs = timeoutMs;
LastSqlTxId = txId;
using var arrayBufferWriter = new PooledArrayBuffer();
var writer = new MsgPackWriter(arrayBufferWriter);
writer.Write(1); // ResourceId.
if (sql == "SELECT PROPS")
{
writer.Write(true); // HasRowSet.
writer.Write(false); // hasMore.
writer.Write(false); // WasApplied.
writer.Write(0); // AffectedRows.
writer.Write(2); // Meta.
writer.Write(6); // Column props.
writer.Write("NAME"); // Column name.
writer.Write(false); // Nullable.
writer.Write((int)ColumnType.String);
writer.Write(0); // Scale.
writer.Write(0); // Precision.
writer.Write(false); // No origin.
writer.Write(6); // Column props.
writer.Write("VAL"); // Column name.
writer.Write(false); // Nullable.
writer.Write((int)ColumnType.String);
writer.Write(0); // Scale.
writer.Write(0); // Precision.
writer.Write(false); // No origin.
writer.Write(props.Count); // Page size.
foreach (var (key, val) in props)
{
using var tuple = new BinaryTupleBuilder(2);
tuple.AppendString(key);
tuple.AppendString(val?.ToString() ?? string.Empty);
writer.Write(tuple.Build().Span);
}
}
else
{
writer.Write(true); // HasRowSet.
writer.Write(true); // hasMore.
writer.Write(false); // WasApplied.
writer.Write(0); // AffectedRows.
writer.Write(1); // Meta.
writer.Write(6); // Column props.
writer.Write("ID"); // Column name.
writer.Write(false); // Nullable.
writer.Write((int)ColumnType.Int32);
writer.Write(0); // Scale.
writer.Write(0); // Precision.
writer.Write(false); // No origin.
writer.Write(512); // Page size.
for (int i = 0; i < 512; i++)
{
using var tuple = new BinaryTupleBuilder(1);
tuple.AppendInt(i);
writer.Write(tuple.Build().Span);
}
}
Send(handler, requestId, arrayBufferWriter);
}
private void SqlExecScript(MsgPackReader reader)
{
var props = new Dictionary<string, object?>
{
["schema"] = reader.TryReadNil() ? null : reader.ReadString(),
["pageSize"] = reader.TryReadNil() ? null : reader.ReadInt32(),
["timeoutMs"] = reader.TryReadNil() ? null : reader.ReadInt64(),
["sessionTimeoutMs"] = reader.TryReadNil() ? null : reader.ReadInt64()
};
var propCount = reader.ReadInt32();
var propTuple = new BinaryTupleReader(reader.ReadBinary(), propCount * 4);
for (int i = 0; i < propCount; i++)
{
var idx = i * 4;
var name = propTuple.GetString(idx);
var type = (ColumnType)propTuple.GetInt(idx + 1);
var scale = propTuple.GetInt(idx + 2);
props[name] = propTuple.GetObject(idx + 3, type, scale);
}
var sql = reader.ReadString();
props["sql"] = sql;
LastSqlScriptProps = props;
}
private void GetSchemas(MsgPackReader reader, Socket handler, long requestId)
{
var tableId = reader.ReadInt32();
using var arrayBufferWriter = new PooledArrayBuffer();
var writer = new MsgPackWriter(arrayBufferWriter);
writer.Write(1);
writer.Write(1); // Version.
if (tableId == ExistingTableId)
{
writer.Write(1); // Columns.
writer.Write(7); // Column props.
writer.Write("ID");
writer.Write((int)ColumnType.Int32);
writer.Write(0); // Key index.
writer.Write(false); // Nullable.
writer.Write(0); // Colocation index.
writer.Write(0); // Scale.
writer.Write(0); // Precision.
}
else if (tableId == CompositeKeyTableId)
{
writer.Write(2); // Columns.
writer.Write(7); // Column props.
writer.Write("IdStr");
writer.Write((int)ColumnType.String);
writer.Write(0); // Key index.
writer.Write(false); // Nullable.
writer.Write(0); // Colocation index.
writer.Write(0); // Scale.
writer.Write(0); // Precision.
writer.Write(7); // Column props.
writer.Write("IdGuid");
writer.Write((int)ColumnType.Uuid);
writer.Write(1); // Key index.
writer.Write(false); // Nullable.
writer.Write(1); // Colocation index.
writer.Write(0); // Scale.
writer.Write(0); // Precision.
}
else if (tableId == CustomColocationKeyTableId)
{
writer.Write(2); // Columns.
writer.Write(7); // Column props.
writer.Write("IdStr");
writer.Write((int)ColumnType.String);
writer.Write(0); // Key index.
writer.Write(false); // Nullable.
writer.Write(0); // Colocation index.
writer.Write(0); // Scale.
writer.Write(0); // Precision.
writer.Write(7); // Column props.
writer.Write("IdGuid");
writer.Write((int)ColumnType.Uuid);
writer.Write(1); // Key index.
writer.Write(false); // Nullable.
writer.Write(-1); // Colocation index.
writer.Write(0); // Scale.
writer.Write(0); // Precision.
}
Send(handler, requestId, arrayBufferWriter);
}
private PooledArrayBuffer ComputeExecute(MsgPackReader reader, bool colocated = false)
{
// Colocated: table id, schema version, key.
// Else: node names.
if (colocated)
{
reader.Skip(4);
}
else
{
var namesCount = reader.ReadInt32();
for (int i = 0; i < namesCount; i++)
{
reader.ReadString();
}
}
var unitsCount = reader.TryReadNil() ? 0 : reader.ReadInt32();
var units = new List<DeploymentUnit>(unitsCount);
for (int i = 0; i < unitsCount; i++)
{
units.Add(new DeploymentUnit(reader.ReadString(), reader.ReadString()));
}
var jobClassName = reader.ReadString();
var priority = reader.ReadInt32();
var maxRetries = reader.ReadInt64();
object? resObj = jobClassName == GetDetailsJob
? new
{
NodeName = Node.Name,
Units = units.Select(u => $"{u.Name}|{u.Version}").StringJoin(),
jobClassName,
priority,
maxRetries
}.ToString()
: Node.Name;
using var builder = new BinaryTupleBuilder(3);
builder.AppendObjectWithType(resObj);
var arrayBufferWriter = new PooledArrayBuffer();
var writer = new MsgPackWriter(arrayBufferWriter);
writer.Write(builder.Build().Span);
// Status
writer.Write(Guid.NewGuid());
writer.Write(0); // State.
writer.Write(0L); // Create time.
writer.Write(0);
writer.WriteNil(); // Start time.
writer.WriteNil(); // Finish time.
return arrayBufferWriter;
}
internal record struct RequestContext(int RequestCount, ClientOp OpCode, long RequestId);
}
}