blob: d6d105524899fe74133a2ae31e5d2d56df6b3bc0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Impl.Client.Datastream
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Datastream;
using Apache.Ignite.Core.Datastream;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Datastream;
using BinaryWriter = Apache.Ignite.Core.Impl.Binary.BinaryWriter;
/// <summary>
/// Thin client data streamer.
/// </summary>
internal sealed class DataStreamerClient<TK, TV> : IDataStreamerClient<TK, TV>
{
/** Streamer flags. */
[Flags]
private enum Flags : byte
{
AllowOverwrite = 0x01,
SkipStore = 0x02,
KeepBinary = 0x04,
Flush = 0x08,
Close = 0x10
}
/** */
private const int ServerBufferSizeAuto = -1;
/** */
private const int MaxRetries = 16;
/** */
private readonly ClientFailoverSocket _socket;
/** */
private readonly int _cacheId;
/** */
private readonly string _cacheName;
/** */
private readonly DataStreamerClientOptions<TK, TV> _options;
/** */
private readonly ConcurrentDictionary<ClientSocket, DataStreamerClientPerNodeBuffer<TK, TV>> _buffers =
new ConcurrentDictionary<ClientSocket, DataStreamerClientPerNodeBuffer<TK, TV>>();
/** */
[SuppressMessage("Microsoft.Design", "CA2213:DisposableFieldsShouldBeDisposed",
Justification = "WaitHandle is not used in ReaderWriterLockSlim, no need to dispose.")]
private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
/** Cached flags. */
private readonly Flags _flags;
/** */
private readonly ConcurrentStack<DataStreamerClientEntry<TK, TV>[]> _arrayPool
= new ConcurrentStack<DataStreamerClientEntry<TK, TV>[]>();
private readonly Timer _autoFlushTimer;
/** */
private int _arraysAllocated;
/** */
private long _entriesSent;
/** Exception. When set, the streamer is closed. */
private volatile Exception _exception;
/** Cancelled flag. */
private volatile bool _cancelled;
/// <summary>
/// Initializes a new instance of <see cref="DataStreamerClient{TK,TV}"/>.
/// </summary>
/// <param name="socket">Socket.</param>
/// <param name="cacheName">Cache name.</param>
/// <param name="options">Options.</param>
public DataStreamerClient(
ClientFailoverSocket socket,
string cacheName,
DataStreamerClientOptions<TK, TV> options)
{
Debug.Assert(socket != null);
Debug.Assert(!string.IsNullOrEmpty(cacheName));
// Copy to prevent modification.
_options = new DataStreamerClientOptions<TK, TV>(options);
_socket = socket;
_cacheName = cacheName;
_cacheId = BinaryUtils.GetCacheId(cacheName);
_flags = GetFlags(_options);
var interval = _options.AutoFlushInterval;
if (interval != TimeSpan.Zero)
{
_autoFlushTimer = new Timer(_ => AutoFlush(), null, interval, interval);
}
}
/** <inheritdoc /> */
public void Dispose()
{
Close(cancel: false);
}
/** <inheritdoc /> */
public string CacheName
{
get { return _cacheName; }
}
/** <inheritdoc /> */
public bool IsClosed
{
get { return _exception != null; }
}
/** <inheritdoc /> */
public DataStreamerClientOptions<TK, TV> Options
{
get
{
// Copy to prevent modification.
return new DataStreamerClientOptions<TK, TV>(_options);
}
}
/** <inheritdoc /> */
public void Add(TK key, TV val)
{
IgniteArgumentCheck.NotNull(key, "key");
Add(new DataStreamerClientEntry<TK, TV>(key, val));
}
/** <inheritdoc /> */
public void Remove(TK key)
{
IgniteArgumentCheck.NotNull(key, "key");
if (!_options.AllowOverwrite)
{
throw new IgniteClientException("DataStreamer can't remove data when AllowOverwrite is false.");
}
Add(new DataStreamerClientEntry<TK, TV>(key));
}
/** <inheritdoc /> */
public void Flush()
{
FlushAsync().Wait();
}
/** <inheritdoc /> */
public Task FlushAsync()
{
ThrowIfClosed();
return FlushInternalAsync();
}
/** <inheritdoc /> */
public void Close(bool cancel)
{
CloseAsync(cancel).Wait();
}
/** <inheritdoc /> */
public Task CloseAsync(bool cancel)
{
_rwLock.EnterWriteLock();
try
{
if (_exception != null)
{
// Already closed.
return TaskRunner.CompletedTask;
}
_exception = new ObjectDisposedException("DataStreamerClient", "Data streamer has been disposed");
if (_autoFlushTimer != null)
{
_autoFlushTimer.Dispose();
}
if (cancel)
{
// Disregard current buffers, stop all retry loops.
_cancelled = true;
return TaskRunner.CompletedTask;
}
return FlushInternalAsync();
}
finally
{
_rwLock.ExitWriteLock();
}
}
/** <inheritdoc /> */
public override string ToString()
{
return string.Format("{0} [CacheName={1}, IsClosed={2}]", GetType().Name, CacheName, IsClosed);
}
/// <summary>
/// Gets the count of sent entries.
/// </summary>
internal long EntriesSent
{
get { return Interlocked.CompareExchange(ref _entriesSent, -1, -1); }
}
/// <summary>
/// Gets the count of allocated arrays.
/// </summary>
internal int ArraysAllocated
{
get { return Interlocked.CompareExchange(ref _arraysAllocated, -1, -1); }
}
/// <summary>
/// Gets the count of pooled arrays.
/// </summary>
internal int ArraysPooled
{
get { return _arrayPool.Count; }
}
/// <summary>
/// Gets the pooled entry array.
/// </summary>
internal DataStreamerClientEntry<TK, TV>[] GetPooledArray()
{
DataStreamerClientEntry<TK,TV>[] res;
if (_arrayPool.TryPop(out res))
{
// Reset buffer and return.
Array.Clear(res, 0, res.Length);
return res;
}
Interlocked.Increment(ref _arraysAllocated);
res = new DataStreamerClientEntry<TK, TV>[_options.PerNodeBufferSize];
return res;
}
/// <summary>
/// Returns entry array to the pool.
/// </summary>
internal void ReturnPooledArray(DataStreamerClientEntry<TK, TV>[] buffer)
{
_arrayPool.Push(buffer);
}
/// <summary>
/// Adds an entry to the streamer.
/// </summary>
private void Add(DataStreamerClientEntry<TK, TV> entry)
{
if (!_rwLock.TryEnterReadLock(0))
{
throw new ObjectDisposedException("DataStreamerClient", "Data streamer has been disposed");
}
try
{
ThrowIfClosed();
AddNoLock(entry);
}
finally
{
_rwLock.ExitReadLock();
}
}
/// <summary>
/// Adds an entry without RW lock.
/// </summary>
private void AddNoLock(DataStreamerClientEntry<TK, TV> entry)
{
var retries = MaxRetries;
while (!_cancelled)
{
try
{
var socket = _socket.GetAffinitySocket(_cacheId, entry.Key) ?? _socket.GetSocket();
var buffer = GetOrAddPerNodeBuffer(socket);
if (buffer.Add(entry))
{
return;
}
}
catch (Exception e)
{
if (ShouldRetry(e) && retries --> 0)
{
continue;
}
throw;
}
}
}
/// <summary>
/// Flushes the streamer asynchronously.
/// </summary>
private Task FlushInternalAsync()
{
if (_buffers.IsEmpty)
{
return TaskRunner.CompletedTask;
}
var tasks = new List<Task>(_buffers.Count);
foreach (var pair in _buffers)
{
var buffer = pair.Value;
var task = buffer.FlushAllAsync();
if (task != null && !task.IsCompleted)
{
tasks.Add(task);
}
}
return TaskRunner.WhenAll(tasks.ToArray());
}
/// <summary>
/// Flushes the specified buffer asynchronously.
/// </summary>
internal Task FlushBufferAsync(
DataStreamerClientBuffer<TK, TV> buffer,
ClientSocket socket,
SemaphoreSlim semaphore)
{
semaphore.Wait();
var tcs = new TaskCompletionSource<object>();
FlushBufferInternalAsync(buffer, socket, tcs);
return tcs.Task.ContWith(t =>
{
semaphore.Release();
_exception = _exception ?? t.Exception;
return t.Result;
}, TaskContinuationOptions.ExecuteSynchronously);
}
/// <summary>
/// Flushes the specified buffer asynchronously.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
Justification = "Any exception should be propagated to TaskCompletionSource.")]
private void FlushBufferInternalAsync(
DataStreamerClientBuffer<TK, TV> buffer,
ClientSocket socket,
TaskCompletionSource<object> tcs)
{
try
{
socket.DoOutInOpAsync(
ClientOp.DataStreamerStart,
ctx => WriteBuffer(buffer, ctx.Writer),
ctx => (object)null,
syncCallback: true)
.ContWith(
t => FlushBufferCompleteOrRetry(buffer, socket, tcs, t.Exception),
TaskContinuationOptions.ExecuteSynchronously);
}
catch (Exception exception)
{
FlushBufferCompleteOrRetry(buffer, socket, tcs, exception);
}
}
/// <summary>
/// Completes or retries the buffer flush operation.
/// </summary>
private void FlushBufferCompleteOrRetry(
DataStreamerClientBuffer<TK, TV> buffer,
ClientSocket socket,
TaskCompletionSource<object> tcs,
Exception exception)
{
if (exception == null)
{
// Successful flush.
Interlocked.Add(ref _entriesSent, buffer.Count);
ReturnPooledArray(buffer.Entries);
tcs.SetResult(null);
return;
}
if (_cancelled || (!socket.IsDisposed && !ShouldRetry(exception)))
{
// Socket is still connected: this error does not need to be retried.
ReturnPooledArray(buffer.Entries);
tcs.SetException(exception);
return;
}
// Release receiver thread, perform retry on a separate thread.
ThreadPool.QueueUserWorkItem(_ => FlushBufferRetry(buffer, socket, tcs));
}
/// <summary>
/// Retries the buffer flush operation.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
Justification = "Any exception should be propagated to TaskCompletionSource.")]
private void FlushBufferRetry(
DataStreamerClientBuffer<TK, TV> buffer,
ClientSocket failedSocket,
TaskCompletionSource<object> tcs)
{
try
{
// Connection failed. Remove disconnected socket from the map.
DataStreamerClientPerNodeBuffer<TK, TV> removed;
_buffers.TryRemove(failedSocket, out removed);
// Re-add entries to other buffers.
ReAddEntriesAndReturnBuffer(buffer);
if (removed != null)
{
var remaining = removed.Close();
while (remaining != null)
{
if (remaining.MarkFlushed())
{
ReAddEntriesAndReturnBuffer(remaining);
}
remaining = remaining.Previous;
}
}
// Note: if initial flush was caused by full buffer, not requested by the user,
// we don't need to force flush everything here - just re-add entries to other buffers.
FlushInternalAsync().ContWith(flushTask => flushTask.SetAsResult(tcs));
}
catch (Exception e)
{
tcs.SetException(e);
}
}
/// <summary>
/// Re-adds obsolete buffer entries to new buffers and returns the array to the pool.
/// </summary>
private void ReAddEntriesAndReturnBuffer(DataStreamerClientBuffer<TK, TV> buffer)
{
var count = buffer.Count;
var entries = buffer.Entries;
for (var i = 0; i < count; i++)
{
var entry = entries[i];
if (!entry.IsEmpty)
{
AddNoLock(entry);
}
}
ReturnPooledArray(entries);
}
/// <summary>
/// Writes buffer data to the specified writer.
/// </summary>
private void WriteBuffer(DataStreamerClientBuffer<TK, TV> buffer, BinaryWriter w)
{
w.WriteInt(_cacheId);
w.WriteByte((byte) _flags);
w.WriteInt(ServerBufferSizeAuto); // Server per-node buffer size.
w.WriteInt(ServerBufferSizeAuto); // Server per-thread buffer size.
if (_options.Receiver != null)
{
var rcvHolder = new StreamReceiverHolder(_options.Receiver,
(rec, grid, cache, stream, keepBinary) =>
StreamReceiverHolder.InvokeReceiver((IStreamReceiver<TK, TV>) rec, grid, cache, stream,
keepBinary));
w.WriteObjectDetached(rcvHolder);
w.WriteByte(ClientPlatformId.Dotnet);
}
else
{
w.WriteObject<object>(null);
}
var count = buffer.Count;
w.WriteInt(count);
var entries = buffer.Entries;
for (var i = 0; i < count; i++)
{
var entry = entries[i];
if (entry.IsEmpty)
{
continue;
}
w.WriteObjectDetached(entry.Key);
if (entry.Remove)
{
w.WriteObject<object>(null);
}
else
{
w.WriteObjectDetached(entry.Val);
}
}
}
/// <summary>
/// Gets or adds per-node buffer for the specified socket.
/// </summary>
private DataStreamerClientPerNodeBuffer<TK, TV> GetOrAddPerNodeBuffer(ClientSocket socket)
{
DataStreamerClientPerNodeBuffer<TK,TV> res;
if (_buffers.TryGetValue(socket, out res))
{
return res;
}
var candidate = new DataStreamerClientPerNodeBuffer<TK, TV>(this, socket);
res = _buffers.GetOrAdd(socket, candidate);
if (res != candidate)
{
// Another thread won - return array to the pool.
ReturnPooledArray(candidate.Close().Entries);
}
return res;
}
/// <summary>
/// Throws an exception if current streamer instance is closed.
/// </summary>
private void ThrowIfClosed()
{
var ex = _exception;
if (ex == null)
{
return;
}
if (ex is ObjectDisposedException)
{
throw new ObjectDisposedException("Streamer is closed.");
}
throw new IgniteClientException("Streamer is closed with error, check inner exception for details.", ex);
}
/// <summary>
/// Performs timer-based automatic flush.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes",
Justification = "Streamer will be closed if an exception occurs during automated flush. " +
"Timer thread should not throw.")]
private void AutoFlush()
{
if (_exception != null)
{
return;
}
// Prevent multiple parallel timer calls.
if (!Monitor.TryEnter(_autoFlushTimer))
{
return;
}
try
{
// Initiate flush, don't wait for completion.
FlushInternalAsync();
}
catch (Exception)
{
// Ignore.
}
finally
{
Monitor.Exit(_autoFlushTimer);
}
}
/// <summary>
/// Gets a value indicating whether flush should be retried after the specified exception.
/// </summary>
private static bool ShouldRetry(Exception exception)
{
while (exception.InnerException != null)
{
exception = exception.InnerException;
}
if (exception is SocketException || exception is IOException)
{
return true;
}
var clientEx = exception as IgniteClientException;
if (clientEx != null && clientEx.StatusCode == ClientStatusCode.InvalidNodeState)
{
return true;
}
return false;
}
/// <summary>
/// Gets the flags.
/// </summary>
private static Flags GetFlags(DataStreamerClientOptions options)
{
var flags = Flags.Flush | Flags.Close;
if (options.AllowOverwrite)
{
flags |= Flags.AllowOverwrite;
}
if (options.SkipStore)
{
flags |= Flags.SkipStore;
}
if (options.ReceiverKeepBinary)
{
flags |= Flags.KeepBinary;
}
return flags;
}
}
}