blob: bb2bdbf5e07ea5d7cac28be4099339f51b4e50b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Impl.Client
{
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Cache.Affinity;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Log;
using Apache.Ignite.Core.Log;
/// <summary>
/// Wrapper over framework socket for Ignite thin client operations.
/// </summary>
internal sealed class ClientSocket : IDisposable
{
/** Version 1.0.0. */
public static readonly ClientProtocolVersion Ver100 = new ClientProtocolVersion(1, 0, 0);
/** Version 1.1.0. */
public static readonly ClientProtocolVersion Ver110 = new ClientProtocolVersion(1, 1, 0);
/** Version 1.2.0. */
public static readonly ClientProtocolVersion Ver120 = new ClientProtocolVersion(1, 2, 0);
/** Version 1.4.0. */
public static readonly ClientProtocolVersion Ver140 = new ClientProtocolVersion(1, 4, 0);
/** Version 1.5.0. */
// This version is reserved for IEP-34 Thin client: transactions support
// ReSharper disable once UnusedMember.Global
public static readonly ClientProtocolVersion Ver150 = new ClientProtocolVersion(1, 5, 0);
/** Version 1.6.0. */
public static readonly ClientProtocolVersion Ver160 = new ClientProtocolVersion(1, 6, 0);
/** Version 1.7.0. */
public static readonly ClientProtocolVersion Ver170 = new ClientProtocolVersion(1, 7, 0);
/** Current version. */
public static readonly ClientProtocolVersion CurrentProtocolVersion = Ver170;
/** Handshake opcode. */
private const byte OpHandshake = 1;
/** Client type code. */
private const byte ClientType = 2;
/** Underlying socket. */
private readonly Socket _socket;
/** Underlying socket stream. */
private readonly Stream _stream;
/** Operation timeout. */
private readonly TimeSpan _timeout;
/** Request timeout checker. */
private readonly Timer _timeoutCheckTimer;
/** Callback checker guard. */
private volatile bool _checkingTimeouts;
/** Read timeout flag. */
private bool _isReadTimeoutEnabled;
/** Current async operations, map from request id. */
private readonly ConcurrentDictionary<long, Request> _requests
= new ConcurrentDictionary<long, Request>();
/** Server -> Client notification listeners. */
private readonly ConcurrentDictionary<long, ClientNotificationHandler> _notificationListeners
= new ConcurrentDictionary<long, ClientNotificationHandler>();
/** Expected notifications counter. */
private long _expectedNotifications;
/** Request id generator. */
private long _requestId;
/** Socket failure exception. */
private volatile Exception _exception;
/** Locker. */
private readonly object _sendRequestSyncRoot = new object();
/** Locker. */
private readonly object _receiveMessageSyncRoot = new object();
/** Background socket receiver trigger. */
private readonly ManualResetEventSlim _listenerEvent = new ManualResetEventSlim();
/** Dispose locker. */
private readonly object _disposeSyncRoot = new object();
/** Disposed flag. */
private bool _isDisposed;
/** Topology version update callback. */
private readonly Action<AffinityTopologyVersion> _topVerCallback;
/** Logger. */
private readonly ILogger _logger;
/** Marshaller. */
private readonly Marshaller _marsh;
/** Features. */
private readonly ClientFeatures _features;
/// <summary>
/// Initializes a new instance of the <see cref="ClientSocket" /> class.
/// </summary>
/// <param name="clientConfiguration">The client configuration.</param>
/// <param name="endPoint">The end point to connect to.</param>
/// <param name="host">The host name (required for SSL).</param>
/// <param name="version">Protocol version.</param>
/// <param name="topVerCallback">Topology version update callback.</param>
/// <param name="marshaller">Marshaller.</param>
public ClientSocket(IgniteClientConfiguration clientConfiguration, EndPoint endPoint, string host,
ClientProtocolVersion? version, Action<AffinityTopologyVersion> topVerCallback,
Marshaller marshaller)
{
Debug.Assert(clientConfiguration != null);
Debug.Assert(endPoint != null);
Debug.Assert(!string.IsNullOrWhiteSpace(host));
Debug.Assert(topVerCallback != null);
Debug.Assert(marshaller != null);
_topVerCallback = topVerCallback;
_marsh = marshaller;
_timeout = clientConfiguration.SocketTimeout;
_logger = (clientConfiguration.Logger ?? NoopLogger.Instance).GetLogger(GetType());
_socket = Connect(clientConfiguration, endPoint, _logger);
_stream = GetSocketStream(_socket, clientConfiguration, host);
ServerVersion = version ?? CurrentProtocolVersion;
Validate(clientConfiguration);
_features = Handshake(clientConfiguration, ServerVersion);
// Check periodically if any request has timed out.
if (_timeout > TimeSpan.Zero)
{
// Minimum Socket timeout is 500ms.
_timeoutCheckTimer = new Timer(CheckTimeouts, null, _timeout, TimeSpan.FromMilliseconds(500));
}
// Continuously and asynchronously wait for data from server.
// TaskCreationOptions.LongRunning actually means a new thread.
TaskRunner.Run(WaitForMessages, TaskCreationOptions.LongRunning);
}
/// <summary>
/// Validate configuration.
/// </summary>
/// <param name="cfg">Configuration.</param>
private static void Validate(IgniteClientConfiguration cfg)
{
if (cfg.UserName != null)
{
if (cfg.UserName.Length == 0)
throw new IgniteClientException("IgniteClientConfiguration.Username cannot be empty.");
if (cfg.Password == null)
throw new IgniteClientException("IgniteClientConfiguration.Password cannot be null when Username is set.");
}
if (cfg.Password != null)
{
if (cfg.Password.Length == 0)
throw new IgniteClientException("IgniteClientConfiguration.Password cannot be empty.");
if (cfg.UserName == null)
throw new IgniteClientException("IgniteClientConfiguration.UserName cannot be null when Password is set.");
}
}
/// <summary>
/// Performs a send-receive operation.
/// </summary>
public T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction,
Func<ClientResponseContext, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null)
{
// Encode.
var reqMsg = WriteMessage(writeAction, opId);
// Send.
var response = SendRequest(ref reqMsg) ?? SendRequestAsync(ref reqMsg).Result;
// Decode.
return DecodeResponse(response, readFunc, errorFunc);
}
/// <summary>
/// Performs a send-receive operation asynchronously.
/// </summary>
public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<ClientRequestContext> writeAction,
Func<ClientResponseContext, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null,
bool syncCallback = false)
{
// Encode.
var reqMsg = WriteMessage(writeAction, opId);
// Send.
var task = SendRequestAsync(ref reqMsg);
// Decode.
if (syncCallback)
{
return task.ContinueWith(responseTask => DecodeResponse(responseTask.Result, readFunc, errorFunc),
TaskContinuationOptions.ExecuteSynchronously);
}
// NOTE: ContWith explicitly uses TaskScheduler.Default,
// which runs DecodeResponse (and any user continuations) on a thread pool thread,
// so that WaitForMessages thread does not do anything except reading from the socket.
return task.ContWith(responseTask => DecodeResponse(responseTask.Result, readFunc, errorFunc));
}
/// <summary>
/// Enables notifications on this socket.
/// </summary>
public void ExpectNotifications()
{
Interlocked.Increment(ref _expectedNotifications);
}
/// <summary>
/// Adds a notification handler.
/// </summary>
/// <param name="notificationId">Notification id.</param>
/// <param name="handlerDelegate">Handler delegate.</param>
public void AddNotificationHandler(long notificationId, ClientNotificationHandler.Handler handlerDelegate)
{
var handler = _notificationListeners.GetOrAdd(notificationId,
_ => new ClientNotificationHandler(_logger, handlerDelegate));
if (!handler.HasHandler)
{
// We could use AddOrUpdate, but SetHandler must be called outside of Update call,
// because it causes handler execution, which, in turn, may call RemoveNotificationHandler.
handler.SetHandler(handlerDelegate);
}
_listenerEvent.Set();
}
/// <summary>
/// Removes a notification handler with the given id.
/// </summary>
/// <param name="notificationId">Notification id.</param>
/// <returns>True when removed, false otherwise.</returns>
public void RemoveNotificationHandler(long notificationId)
{
if (IsDisposed)
{
return;
}
ClientNotificationHandler unused;
var removed = _notificationListeners.TryRemove(notificationId, out unused);
Debug.Assert(removed);
var count = Interlocked.Decrement(ref _expectedNotifications);
if (count < 0)
{
throw new IgniteClientException("Negative thin client expected notification count.");
}
}
/// <summary>
/// Gets the features.
/// </summary>
public ClientFeatures Features
{
get { return _features; }
}
/// <summary>
/// Gets the current remote EndPoint.
/// </summary>
public EndPoint RemoteEndPoint { get { return _socket.RemoteEndPoint; } }
/// <summary>
/// Gets the current local EndPoint.
/// </summary>
public EndPoint LocalEndPoint { get { return _socket.LocalEndPoint; } }
/// <summary>
/// Gets the ID of the connected server node.
/// </summary>
public Guid? ServerNodeId { get; private set; }
/// <summary>
/// Gets a value indicating whether this socket is disposed.
/// </summary>
public bool IsDisposed
{
get { return _isDisposed; }
}
/// <summary>
/// Gets the server protocol version.
/// </summary>
public ClientProtocolVersion ServerVersion { get; private set; }
/// <summary>
/// Gets the marshaller.
/// </summary>
public Marshaller Marshaller
{
get { return _marsh; }
}
/// <summary>
/// Gets a value indicating that this socket is in async mode:
/// async requests are pending, or notifications are expected.
/// <para />
/// We have sync and async modes because sync mode is faster.
/// </summary>
private bool IsAsyncMode
{
get { return !_requests.IsEmpty || Interlocked.Read(ref _expectedNotifications) > 0; }
}
/// <summary>
/// Starts waiting for the new message.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
private void WaitForMessages()
{
_logger.Trace("Receiver thread #{0} started.", Thread.CurrentThread.ManagedThreadId);
try
{
// Null exception means active socket.
while (_exception == null)
{
// Do not call Receive if there are no pending async requests or notification listeners.
while (!IsAsyncMode)
{
// Wait with a timeout so we check for disposed state periodically.
_listenerEvent.Wait(1000);
if (_exception != null)
{
return;
}
_listenerEvent.Reset();
}
lock (_receiveMessageSyncRoot)
{
// Async operations should not have a read timeout.
if (_isReadTimeoutEnabled)
{
_stream.ReadTimeout = Timeout.Infinite;
_isReadTimeoutEnabled = false;
}
var msg = ReceiveMessage();
HandleResponse(msg);
}
}
}
catch (Exception ex)
{
// Socket failure (connection dropped, etc).
// Close socket and all pending requests.
// Note that this does not include request decoding exceptions (failed request, invalid data, etc).
_exception = ex;
Dispose();
}
finally
{
_logger.Trace("Receiver thread #{0} stopped.", Thread.CurrentThread.ManagedThreadId);
}
}
/// <summary>
/// Handles the response.
/// </summary>
private void HandleResponse(byte[] response)
{
var stream = new BinaryHeapStream(response);
var requestId = stream.ReadLong();
if (HandleNotification(requestId, stream))
{
return;
}
Request req;
if (!_requests.TryRemove(requestId, out req))
{
if (_exception != null)
{
return;
}
// Response with unknown id.
throw new IgniteClientException("Invalid thin client response id: " + requestId);
}
if (req != null)
{
req.CompletionSource.TrySetResult(stream);
}
}
/// <summary>
/// Handles the notification message, if present in the given stream.
/// </summary>
private bool HandleNotification(long requestId, BinaryHeapStream stream)
{
if (ServerVersion < Ver160)
{
return false;
}
var flags = (ClientFlags) stream.ReadShort();
stream.Seek(-2, SeekOrigin.Current);
if ((flags & ClientFlags.Notification) != ClientFlags.Notification)
{
return false;
}
_notificationListeners.GetOrAdd(requestId, _ => new ClientNotificationHandler(_logger))
.Handle(stream, null);
return true;
}
/// <summary>
/// Decodes the response that we got from <see cref="HandleResponse"/>.
/// </summary>
private T DecodeResponse<T>(BinaryHeapStream stream, Func<ClientResponseContext, T> readFunc,
Func<ClientStatusCode, string, T> errorFunc)
{
ClientStatusCode statusCode;
if (ServerVersion >= Ver140)
{
var flags = (ClientFlags) stream.ReadShort();
if ((flags & ClientFlags.AffinityTopologyChanged) == ClientFlags.AffinityTopologyChanged)
{
var topVer = new AffinityTopologyVersion(stream.ReadLong(), stream.ReadInt());
if (_topVerCallback != null)
{
_topVerCallback(topVer);
}
}
statusCode = (flags & ClientFlags.Error) == ClientFlags.Error
? (ClientStatusCode) stream.ReadInt()
: ClientStatusCode.Success;
}
else
{
statusCode = (ClientStatusCode) stream.ReadInt();
}
if (statusCode == ClientStatusCode.Success)
{
return readFunc != null
? readFunc(new ClientResponseContext(stream, this))
: default(T);
}
var msg = BinaryUtils.Marshaller.StartUnmarshal(stream).ReadString();
if (errorFunc != null)
{
return errorFunc(statusCode, msg);
}
throw new IgniteClientException(msg, null, statusCode);
}
/// <summary>
/// Performs client protocol handshake.
/// </summary>
private ClientFeatures Handshake(IgniteClientConfiguration clientConfiguration, ClientProtocolVersion version)
{
var hasAuth = version >= Ver110 && clientConfiguration.UserName != null;
var hasFeatures = version >= Ver170;
// Send request.
int messageLen;
var buf = WriteMessage(stream =>
{
// Handshake.
stream.WriteByte(OpHandshake);
// Protocol version.
stream.WriteShort(version.Major);
stream.WriteShort(version.Minor);
stream.WriteShort(version.Maintenance);
// Client type: platform.
stream.WriteByte(ClientType);
// Writing features.
if (hasFeatures)
{
BinaryUtils.Marshaller.Marshal(stream,
w => w.WriteByteArray(ClientFeatures.AllFeatures));
}
// Authentication data.
if (hasAuth)
{
BinaryUtils.Marshaller.Marshal(stream, writer =>
{
writer.WriteString(clientConfiguration.UserName);
writer.WriteString(clientConfiguration.Password);
});
}
}, 12, out messageLen);
SocketWrite(buf, messageLen);
// Decode response.
var res = ReceiveMessage();
using (var stream = new BinaryHeapStream(res))
{
// Read input.
var success = stream.ReadBool();
if (success)
{
BitArray featureBits = null;
if (hasFeatures)
{
featureBits = new BitArray(BinaryUtils.Marshaller.Unmarshal<byte[]>(stream));
}
if (version >= Ver140)
{
ServerNodeId = BinaryUtils.Marshaller.Unmarshal<Guid>(stream);
}
ServerVersion = version;
_logger.Debug("Handshake completed on {0}, protocol version = {1}",
_socket.RemoteEndPoint, version);
return new ClientFeatures(version, featureBits);
}
ServerVersion =
new ClientProtocolVersion(stream.ReadShort(), stream.ReadShort(), stream.ReadShort());
var errMsg = BinaryUtils.Marshaller.Unmarshal<string>(stream);
ClientStatusCode errCode = ClientStatusCode.Fail;
if (stream.Remaining > 0)
{
errCode = (ClientStatusCode) stream.ReadInt();
}
_logger.Debug("Handshake failed on {0}, requested protocol version = {1}, " +
"server protocol version = {2}, status = {3}, message = {4}",
_socket.RemoteEndPoint, version, ServerVersion, errCode, errMsg);
// Authentication error is handled immediately.
if (errCode == ClientStatusCode.AuthenticationFailed)
{
throw new IgniteClientException(errMsg, null, ClientStatusCode.AuthenticationFailed);
}
// Retry if server version is different and falls within supported version range.
var retry = ServerVersion != version &&
ServerVersion >= Ver100 &&
ServerVersion <= CurrentProtocolVersion;
if (retry)
{
_logger.Debug("Retrying handshake on {0} with protocol version {1}",
_socket.RemoteEndPoint, ServerVersion);
return Handshake(clientConfiguration, ServerVersion);
}
throw new IgniteClientException(string.Format(
"Client handshake failed: '{0}'. Client version: {1}. Server version: {2}",
errMsg, version, ServerVersion), null, errCode);
}
}
/// <summary>
/// Receives a message from socket.
/// </summary>
private byte[] ReceiveMessage()
{
var size = GetInt(ReceiveBytes(4));
var msg = ReceiveBytes(size);
return msg;
}
/// <summary>
/// Receives the data filling provided buffer entirely.
/// </summary>
private byte[] ReceiveBytes(int size)
{
Debug.Assert(size > 0);
// Socket.Receive can return any number of bytes, even 1.
// We should repeat Receive calls until required amount of data has been received.
var buf = new byte[size];
var received = SocketRead(buf, 0, size);
while (received < size)
{
var res = SocketRead(buf, received, size - received);
if (res == 0)
{
// Disconnected.
_logger.Debug("Connection lost on {0} (failed to read data from socket)", _socket.RemoteEndPoint);
_exception = _exception ?? new SocketException((int) SocketError.ConnectionAborted);
Dispose();
CheckException();
}
received += res;
}
return buf;
}
/// <summary>
/// Sends the request synchronously.
/// </summary>
private BinaryHeapStream SendRequest(ref RequestMessage reqMsg)
{
// Do not enter lock when disposed.
CheckException();
// If there are no pending async requests, we can execute this operation synchronously,
// which is more efficient.
if (IsAsyncMode)
{
return null;
}
var sendLockTaken = false;
var receiveLockTaken = false;
try
{
Monitor.TryEnter(_sendRequestSyncRoot, 0, ref sendLockTaken);
if (!sendLockTaken)
{
return null;
}
Monitor.TryEnter(_receiveMessageSyncRoot, 0, ref receiveLockTaken);
if (!receiveLockTaken)
{
return null;
}
CheckException();
if (IsAsyncMode)
{
return null;
}
SocketWrite(reqMsg.Buffer, reqMsg.Length);
// Sync operations rely on stream timeout.
if (!_isReadTimeoutEnabled)
{
_stream.ReadTimeout = _stream.WriteTimeout;
_isReadTimeoutEnabled = true;
}
var respMsg = ReceiveMessage();
var response = new BinaryHeapStream(respMsg);
var responseId = response.ReadLong();
Debug.Assert(responseId == reqMsg.Id);
return response;
}
finally
{
if (sendLockTaken)
{
Monitor.Exit(_sendRequestSyncRoot);
}
if (receiveLockTaken)
{
Monitor.Exit(_receiveMessageSyncRoot);
}
}
}
/// <summary>
/// Sends the request asynchronously and returns a task for corresponding response.
/// </summary>
private Task<BinaryHeapStream> SendRequestAsync(ref RequestMessage reqMsg)
{
// Do not enter lock when disposed.
CheckException();
lock (_sendRequestSyncRoot)
{
CheckException();
// Register.
var req = new Request();
var added = _requests.TryAdd(reqMsg.Id, req);
Debug.Assert(added);
// Send.
SocketWrite(reqMsg.Buffer, reqMsg.Length);
_listenerEvent.Set();
return req.CompletionSource.Task;
}
}
/// <summary>
/// Writes the message to a byte array.
/// </summary>
private static byte[] WriteMessage(Action<IBinaryStream> writeAction, int bufSize, out int messageLen)
{
var stream = new BinaryHeapStream(bufSize);
stream.WriteInt(0); // Reserve message size.
writeAction(stream);
stream.WriteInt(0, stream.Position - 4); // Write message size.
messageLen = stream.Position;
return stream.GetArray();
}
/// <summary>
/// Writes the message to a byte array.
/// </summary>
private RequestMessage WriteMessage(Action<ClientRequestContext> writeAction, ClientOp opId)
{
_features.ValidateOp(opId);
var requestId = Interlocked.Increment(ref _requestId);
// Potential perf improvements:
// * ArrayPool<T>
// * Write to socket stream directly (not trivial because of unknown size)
var stream = new BinaryHeapStream(256);
stream.WriteInt(0); // Reserve message size.
stream.WriteShort((short) opId);
stream.WriteLong(requestId);
if (writeAction != null)
{
var ctx = new ClientRequestContext(stream, this);
writeAction(ctx);
ctx.FinishMarshal();
}
stream.WriteInt(0, stream.Position - 4); // Write message size.
return new RequestMessage(requestId, stream.GetArray(), stream.Position);
}
/// <summary>
/// Writes to the socket. All socket writes should go through this method.
/// </summary>
private void SocketWrite(byte[] buf, int len)
{
try
{
_stream.Write(buf, 0, len);
}
catch (Exception e)
{
_exception = e;
Dispose();
throw;
}
}
/// <summary>
/// Reads from the socket. All socket reads should go through this method.
/// </summary>
private int SocketRead(byte[] buf, int pos, int len)
{
try
{
return _stream.Read(buf, pos, len);
}
catch (Exception e)
{
_exception = e;
Dispose();
throw;
}
}
/// <summary>
/// Connects the socket.
/// </summary>
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope",
Justification = "Socket is returned from this method.")]
private static Socket Connect(IgniteClientConfiguration cfg, EndPoint endPoint, ILogger logger)
{
var socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp)
{
NoDelay = cfg.TcpNoDelay,
Blocking = true,
SendTimeout = (int) cfg.SocketTimeout.TotalMilliseconds
};
if (cfg.SocketSendBufferSize != IgniteClientConfiguration.DefaultSocketBufferSize)
{
socket.SendBufferSize = cfg.SocketSendBufferSize;
}
if (cfg.SocketReceiveBufferSize != IgniteClientConfiguration.DefaultSocketBufferSize)
{
socket.ReceiveBufferSize = cfg.SocketReceiveBufferSize;
}
logger.Debug("Socket connection attempt: {0}", endPoint);
socket.Connect(endPoint);
logger.Debug("Socket connection established: {0} -> {1}", socket.LocalEndPoint, socket.RemoteEndPoint);
return socket;
}
/// <summary>
/// Gets the socket stream.
/// </summary>
private static Stream GetSocketStream(Socket socket, IgniteClientConfiguration cfg, string host)
{
var stream = new NetworkStream(socket, ownsSocket: true)
{
WriteTimeout = (int) cfg.SocketTimeout.TotalMilliseconds
};
if (cfg.SslStreamFactory == null)
{
return stream;
}
return cfg.SslStreamFactory.Create(stream, host);
}
/// <summary>
/// Checks if any of the current requests timed out.
/// </summary>
private void CheckTimeouts(object _)
{
if (_checkingTimeouts)
{
return;
}
_checkingTimeouts = true;
try
{
if (_exception != null)
{
_timeoutCheckTimer.Dispose();
}
foreach (var pair in _requests)
{
var req = pair.Value;
if (req != null && req.Duration > _timeout)
{
_requests[pair.Key] = null;
req.CompletionSource.TrySetException(new SocketException((int)SocketError.TimedOut));
}
}
}
finally
{
_checkingTimeouts = false;
}
}
/// <summary>
/// Gets the int from buffer.
/// </summary>
private static unsafe int GetInt(byte[] buf)
{
fixed (byte* b = buf)
{
return BinaryHeapStream.ReadInt0(b);
}
}
/// <summary>
/// Checks the exception.
/// </summary>
private void CheckException()
{
var ex = _exception;
if (ex != null)
{
throw new IgniteClientException("Client connection has failed. Examine InnerException for details", ex);
}
}
/// <summary>
/// Closes the socket and completes all pending requests with an error.
/// </summary>
private void EndRequestsWithError()
{
var ex = _exception;
Debug.Assert(ex != null);
while (!_requests.IsEmpty)
{
foreach (var reqId in _requests.Keys.ToArray())
{
Request req;
if (_requests.TryRemove(reqId, out req) && req != null)
{
req.CompletionSource.TrySetException(ex);
}
}
}
while (!_notificationListeners.IsEmpty)
{
foreach (var id in _notificationListeners.Keys)
{
ClientNotificationHandler handler;
if (_notificationListeners.TryRemove(id, out handler))
{
handler.Handle(null, ex);
}
}
}
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
[SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly",
Justification = "There is no finalizer.")]
public void Dispose()
{
lock (_disposeSyncRoot)
{
if (_isDisposed)
{
return;
}
// Set disposed state before ending requests so that request continuations see disconnected socket.
_isDisposed = true;
_exception = _exception ?? new ObjectDisposedException(typeof(ClientSocket).FullName);
EndRequestsWithError();
// This will call Socket.Shutdown and Socket.Close.
_stream.Close();
_listenerEvent.Set();
_listenerEvent.Dispose();
if (_timeoutCheckTimer != null)
{
_timeoutCheckTimer.Dispose();
}
}
}
/** <inheritDoc /> */
public override string ToString()
{
return string.Format("ClientSocket [RemoteEndPoint={0}]", RemoteEndPoint);
}
/// <summary>
/// Represents a request.
/// </summary>
private class Request
{
/** */
private readonly TaskCompletionSource<BinaryHeapStream> _completionSource;
/** */
private readonly DateTime _startTime;
/// <summary>
/// Initializes a new instance of the <see cref="Request"/> class.
/// </summary>
public Request()
{
_completionSource = new TaskCompletionSource<BinaryHeapStream>();
_startTime = DateTime.Now;
}
/// <summary>
/// Gets the completion source.
/// </summary>
public TaskCompletionSource<BinaryHeapStream> CompletionSource
{
get { return _completionSource; }
}
/// <summary>
/// Gets the duration.
/// </summary>
public TimeSpan Duration
{
get { return DateTime.Now - _startTime; }
}
}
/// <summary>
/// Represents a request message.
/// </summary>
private struct RequestMessage
{
/** */
public readonly long Id;
/** */
public readonly byte[] Buffer;
/** */
public readonly int Length;
/// <summary>
/// Initializes a new instance of the <see cref="RequestMessage"/> struct.
/// </summary>
public RequestMessage(long id, byte[] buffer, int length)
{
Id = id;
Length = length;
Buffer = buffer;
}
}
}
}