blob: 3bbb2f3aa985c0720a69eb1cafc047158762d768 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Tests;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Internal.Proto;
/// <summary>
/// Proxy for Ignite server with request logging and interception.
/// Provides a way to test request routing while using real Ignite cluster.
/// </summary>
public sealed class IgniteProxy : IgniteServerBase
{
private readonly Socket _socket;
private readonly ConcurrentQueue<ClientOp> _ops = new();
public IgniteProxy(EndPoint targetEndpoint, string nodeName)
{
TargetEndpoint = targetEndpoint;
NodeName = nodeName;
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_socket.Connect(targetEndpoint);
}
public string NodeName { get; private init; }
public EndPoint TargetEndpoint { get; private init; }
internal IList<ClientOp> ClientOps => _ops.ToList();
public void ClearOps() => _ops.Clear();
protected override void Handle(Socket handler, CancellationToken cancellationToken)
{
// Handshake.
{
// Forward magic from client to server.
using var magic = ReceiveBytes(handler, 4);
_socket.Send(magic.AsMemory().Span);
// Receive handshake from client.
var msgSize = ReceiveMessageSize(handler);
using var msg = ReceiveBytes(handler, msgSize);
// Forward handshake to server.
_socket.Send(BitConverter.GetBytes(IPAddress.HostToNetworkOrder(msgSize)));
_socket.Send(msg.AsMemory().Span);
// Forward magic from server to client.
using var serverMagic = ReceiveBytes(_socket, 4);
handler.Send(serverMagic.AsMemory().Span);
// Receive handshake from server.
var serverMsgSize = ReceiveMessageSize(_socket);
using var serverMsg = ReceiveBytes(_socket, serverMsgSize);
// Forward to client.
handler.Send(BitConverter.GetBytes(IPAddress.HostToNetworkOrder(serverMsgSize)));
handler.Send(serverMsg.AsMemory().Span);
}
// Separate relay loops for each direction: don't block heartbeats while some request is being processed.
// Client -> Server.
var clientToServerRelay = Task.Run(
() =>
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
// Receive from client.
var msgSize = ReceiveMessageSize(handler);
using var msg = ReceiveBytes(handler, msgSize);
_ops.Enqueue((ClientOp)msg.GetReader().ReadInt32());
// Forward to server.
_socket.Send(BitConverter.GetBytes(IPAddress.HostToNetworkOrder(msgSize)));
_socket.Send(msg.AsMemory().Span);
}
catch (Exception e)
{
Console.WriteLine($"Error in IgniteProxy Client -> Server relay (lastOp = {_ops.Last()}: {e}");
throw;
}
}
},
cancellationToken);
// Server -> Client.
var serverToClientRelay = Task.Run(
() =>
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
// Receive from server.
var serverMsgSize = ReceiveMessageSize(_socket);
using var serverMsg = ReceiveBytes(_socket, serverMsgSize);
// Forward to client.
handler.Send(BitConverter.GetBytes(IPAddress.HostToNetworkOrder(serverMsgSize)));
handler.Send(serverMsg.AsMemory().Span);
}
catch (Exception e)
{
Console.WriteLine($"Error in IgniteProxy Server -> Client relay (lastOp = {_ops.Last()}: {e}");
throw;
}
}
},
cancellationToken);
Task.WhenAll(clientToServerRelay, serverToClientRelay).Wait(cancellationToken);
handler.Disconnect(true);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
_socket.Dispose();
}
}
}