blob: 8a256ed2b4006d9c9fe47bfe0152fa6621341d21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Impl.Datastream
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Datastream;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Common;
/// <summary>
/// Data streamer internal interface to get rid of generics.
/// </summary>
internal interface IDataStreamer
{
/// <summary>
/// Callback invoked on topology size change.
/// </summary>
/// <param name="topVer">New topology version.</param>
/// <param name="topSize">New topology size.</param>
void TopologyChange(long topVer, int topSize);
}
/// <summary>
/// Data streamer implementation.
/// </summary>
internal class DataStreamerImpl<TK, TV> : PlatformDisposableTargetAdapter, IDataStreamer, IDataStreamer<TK, TV>
{
#pragma warning disable 0420
/** Policy: continue. */
internal const int PlcContinue = 0;
/** Policy: close. */
internal const int PlcClose = 1;
/** Policy: cancel and close. */
internal const int PlcCancelClose = 2;
/** Policy: flush. */
internal const int PlcFlush = 3;
/** Operation: update. */
private const int OpUpdate = 1;
/** Operation: set receiver. */
private const int OpReceiver = 2;
/** */
private const int OpAllowOverwrite = 3;
/** */
private const int OpSetAllowOverwrite = 4;
/** */
private const int OpSkipStore = 5;
/** */
private const int OpSetSkipStore = 6;
/** */
private const int OpPerNodeBufferSize = 7;
/** */
private const int OpSetPerNodeBufferSize = 8;
/** */
private const int OpPerNodeParallelOps = 9;
/** */
private const int OpSetPerNodeParallelOps = 10;
/** */
private const int OpListenTopology = 11;
/** */
private const int OpGetTimeout = 12;
/** */
private const int OpSetTimeout = 13;
/** */
private const int OpPerThreadBufferSize = 14;
/** */
private const int OpSetPerThreadBufferSize = 15;
/** Cache name. */
private readonly string _cacheName;
/** Lock. */
[SuppressMessage("Microsoft.Design", "CA2213:DisposableFieldsShouldBeDisposed",
Justification = "WaitHandle is not used in ReaderWriterLockSlim, no need to dispose.")]
private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
/** Close future. */
private readonly TaskCompletionSource<object> _closeFut = new TaskCompletionSource<object>();
/** GC handle to this streamer. */
private readonly long _hnd;
/** Topology version. */
private long _topVer;
/** Topology size. */
private int _topSize = 1;
/** Buffer send size. */
private volatile int _bufSndSize;
/** Current data streamer batch. */
private volatile DataStreamerBatch<TK, TV> _batch;
/** Flusher. */
private readonly Flusher<TK, TV> _flusher;
/** Receiver. */
private volatile IStreamReceiver<TK, TV> _rcv;
/** Receiver handle. */
private long _rcvHnd;
/** Receiver binary mode. */
private readonly bool _keepBinary;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
/// <param name="marsh">Marshaller.</param>
/// <param name="cacheName">Cache name.</param>
/// <param name="keepBinary">Binary flag.</param>
public DataStreamerImpl(IPlatformTargetInternal target, Marshaller marsh, string cacheName, bool keepBinary)
: base(target)
{
_cacheName = cacheName;
_keepBinary = keepBinary;
// Create empty batch.
_batch = new DataStreamerBatch<TK, TV>();
// Allocate GC handle so that this data streamer could be easily dereferenced from native code.
WeakReference thisRef = new WeakReference(this);
_hnd = marsh.Ignite.HandleRegistry.Allocate(thisRef);
// Start topology listening. This call will ensure that buffer size member is updated.
DoOutInOp(OpListenTopology, _hnd);
// Membar to ensure fields initialization before leaving constructor.
Thread.MemoryBarrier();
// Start flusher after everything else is initialized.
_flusher = new Flusher<TK, TV>(thisRef);
_flusher.RunThread();
}
/** <inheritDoc /> */
public string CacheName
{
get { return _cacheName; }
}
/** <inheritDoc /> */
public bool AllowOverwrite
{
get
{
_rwLock.EnterReadLock();
try
{
ThrowIfDisposed();
return DoOutInOp(OpAllowOverwrite) == True;
}
finally
{
_rwLock.ExitReadLock();
}
}
set
{
_rwLock.EnterWriteLock();
try
{
ThrowIfDisposed();
DoOutInOp(OpSetAllowOverwrite, value ? True : False);
}
finally
{
_rwLock.ExitWriteLock();
}
}
}
/** <inheritDoc /> */
public bool SkipStore
{
get
{
_rwLock.EnterReadLock();
try
{
ThrowIfDisposed();
return DoOutInOp(OpSkipStore) == True;
}
finally
{
_rwLock.ExitReadLock();
}
}
set
{
_rwLock.EnterWriteLock();
try
{
ThrowIfDisposed();
DoOutInOp(OpSetSkipStore, value ? True : False);
}
finally
{
_rwLock.ExitWriteLock();
}
}
}
/** <inheritDoc /> */
public int PerNodeBufferSize
{
get
{
_rwLock.EnterReadLock();
try
{
ThrowIfDisposed();
return (int) DoOutInOp(OpPerNodeBufferSize);
}
finally
{
_rwLock.ExitReadLock();
}
}
set
{
_rwLock.EnterWriteLock();
try
{
ThrowIfDisposed();
DoOutInOp(OpSetPerNodeBufferSize, value);
_bufSndSize = _topSize * value;
}
finally
{
_rwLock.ExitWriteLock();
}
}
}
/** <inheritDoc /> */
public int PerThreadBufferSize
{
get
{
_rwLock.EnterReadLock();
try
{
ThrowIfDisposed();
return (int) DoOutInOp(OpPerThreadBufferSize);
}
finally
{
_rwLock.ExitReadLock();
}
}
set
{
_rwLock.EnterWriteLock();
try
{
ThrowIfDisposed();
DoOutInOp(OpSetPerThreadBufferSize, value);
}
finally
{
_rwLock.ExitWriteLock();
}
}
}
/** <inheritDoc /> */
public int PerNodeParallelOperations
{
get
{
_rwLock.EnterReadLock();
try
{
ThrowIfDisposed();
return (int) DoOutInOp(OpPerNodeParallelOps);
}
finally
{
_rwLock.ExitReadLock();
}
}
set
{
_rwLock.EnterWriteLock();
try
{
ThrowIfDisposed();
DoOutInOp(OpSetPerNodeParallelOps, value);
}
finally
{
_rwLock.ExitWriteLock();
}
}
}
/** <inheritDoc /> */
public long AutoFlushFrequency
{
get
{
_rwLock.EnterReadLock();
try
{
ThrowIfDisposed();
return _flusher.Frequency;
}
finally
{
_rwLock.ExitReadLock();
}
}
set
{
_rwLock.EnterWriteLock();
try
{
ThrowIfDisposed();
_flusher.Frequency = value;
}
finally
{
_rwLock.ExitWriteLock();
}
}
}
/** <inheritDoc /> */
public TimeSpan AutoFlushInterval
{
get
{
return TimeSpan.FromMilliseconds(AutoFlushFrequency);
}
set
{
AutoFlushFrequency = (long) value.TotalMilliseconds;
}
}
/** <inheritDoc /> */
public Task Task
{
get
{
return _closeFut.Task;
}
}
/** <inheritDoc /> */
public Task GetCurrentBatchTask()
{
var batch = _batch;
return batch != null
? batch.GetThisAndPreviousCompletionTask()
: Task; // Streamer is closing. Wait for close to complete.
}
/** <inheritDoc /> */
public IStreamReceiver<TK, TV> Receiver
{
get
{
ThrowIfDisposed();
return _rcv;
}
set
{
IgniteArgumentCheck.NotNull(value, "value");
var handleRegistry = Marshaller.Ignite.HandleRegistry;
_rwLock.EnterWriteLock();
try
{
ThrowIfDisposed();
if (_rcv == value)
return;
var rcvHolder = new StreamReceiverHolder(value,
(rec, grid, cache, stream, keepBinary) =>
StreamReceiverHolder.InvokeReceiver((IStreamReceiver<TK, TV>) rec, grid, cache, stream,
keepBinary));
var rcvHnd0 = handleRegistry.Allocate(rcvHolder);
try
{
DoOutOp(OpReceiver, w =>
{
w.WriteLong(rcvHnd0);
w.WriteObject(rcvHolder);
});
}
catch (Exception)
{
handleRegistry.Release(rcvHnd0);
throw;
}
if (_rcv != null)
handleRegistry.Release(_rcvHnd);
_rcv = value;
_rcvHnd = rcvHnd0;
}
finally
{
_rwLock.ExitWriteLock();
}
}
}
/** <inheritDoc /> */
public Task AddData(TK key, TV val)
{
ThrowIfDisposed();
IgniteArgumentCheck.NotNull(key, "key");
return Add0(new DataStreamerEntry<TK, TV>(key, val), 1);
}
/** <inheritDoc /> */
public Task AddData(KeyValuePair<TK, TV> pair)
{
ThrowIfDisposed();
return Add0(new DataStreamerEntry<TK, TV>(pair.Key, pair.Value), 1);
}
/** <inheritDoc /> */
public Task AddData(ICollection<KeyValuePair<TK, TV>> entries)
{
ThrowIfDisposed();
IgniteArgumentCheck.NotNull(entries, "entries");
return Add0(entries, entries.Count);
}
/** <inheritDoc /> */
public Task RemoveData(TK key)
{
ThrowIfDisposed();
IgniteArgumentCheck.NotNull(key, "key");
return Add0(new DataStreamerRemoveEntry<TK>(key), 1);
}
/** <inheritDoc /> */
public void Add(TK key, TV val)
{
AddData(key, val);
}
/** <inheritDoc /> */
public void Add(KeyValuePair<TK, TV> pair)
{
AddData(pair);
}
/** <inheritDoc /> */
public void Add(ICollection<KeyValuePair<TK, TV>> entries)
{
AddData(entries);
}
/** <inheritDoc /> */
public void Remove(TK key)
{
RemoveData(key);
}
/** <inheritDoc /> */
public void TryFlush()
{
FlushAsync();
}
/** <inheritDoc /> */
public void Flush()
{
FlushAsync().Wait();
}
/** <inheritDoc /> */
public Task FlushAsync()
{
ThrowIfDisposed();
var batch0 = _batch;
if (batch0 != null)
{
Flush0(batch0, false, PlcFlush);
return batch0.GetThisAndPreviousCompletionTask();
}
// Batch is null, i.e. data streamer is closing. Wait for close to complete.
return Task;
}
/** <inheritDoc /> */
public void Close(bool cancel)
{
_flusher.Stop();
while (true)
{
DataStreamerBatch<TK, TV> batch0 = _batch;
if (batch0 == null)
{
// Wait for concurrent close to finish.
_closeFut.Task.Wait();
return;
}
_rwLock.EnterWriteLock();
try
{
if (!Flush0(batch0, true, cancel ? PlcCancelClose : PlcClose))
{
// Retry flushing.
continue;
}
base.Dispose(true);
ReleaseHandles();
ThreadPool.QueueUserWorkItem(_ =>_closeFut.TrySetResult(null));
return;
}
catch (Exception e)
{
base.Dispose(true);
ReleaseHandles();
ThreadPool.QueueUserWorkItem(_ =>_closeFut.TrySetException(e));
throw;
}
finally
{
_rwLock.ExitWriteLock();
}
}
}
/** <inheritDoc /> */
public IDataStreamer<TK1, TV1> WithKeepBinary<TK1, TV1>()
{
if (_keepBinary)
{
var result = this as IDataStreamer<TK1, TV1>;
if (result == null)
throw new InvalidOperationException(
"Can't change type of binary streamer. WithKeepBinary has been called on an instance of " +
"binary streamer with incompatible generic arguments.");
return result;
}
return Marshaller.Ignite.GetDataStreamer<TK1, TV1>(_cacheName, true);
}
/** <inheritDoc /> */
public TimeSpan Timeout
{
get
{
_rwLock.EnterReadLock();
try
{
ThrowIfDisposed();
return BinaryUtils.LongToTimeSpan(DoOutInOp(OpGetTimeout));
}
finally
{
_rwLock.ExitReadLock();
}
}
set
{
_rwLock.EnterWriteLock();
try
{
ThrowIfDisposed();
DoOutInOp(OpSetTimeout, (long) value.TotalMilliseconds);
}
finally
{
_rwLock.ExitWriteLock();
}
}
}
/** <inheritDoc /> */
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
protected override void Dispose(bool disposing)
{
if (disposing)
Close(false); // Normal dispose: do not cancel
else
{
// Finalizer: just close Java streamer
try
{
if (_batch != null)
_batch.Send(this, PlcCancelClose);
}
// ReSharper disable once EmptyGeneralCatchClause
catch (Exception)
{
// Finalizers should never throw
}
ReleaseHandles();
}
base.Dispose(disposing);
}
/// <summary>
/// Releases the handles.
/// </summary>
private void ReleaseHandles()
{
Marshaller.Ignite.HandleRegistry.Release(_hnd, true);
if (_rcv != null)
Marshaller.Ignite.HandleRegistry.Release(_rcvHnd, true);
}
/** <inheritDoc /> */
~DataStreamerImpl()
{
Dispose(false);
}
/** <inheritDoc /> */
public void TopologyChange(long topVer, int topSize)
{
_rwLock.EnterWriteLock();
try
{
ThrowIfDisposed();
if (_topVer < topVer)
{
_topVer = topVer;
_topSize = topSize > 0 ? topSize : 1; // Do not set to 0 to avoid 0 buffer size.
_bufSndSize = (int) (_topSize * DoOutInOp(OpPerNodeBufferSize));
}
}
finally
{
_rwLock.ExitWriteLock();
}
}
/// <summary>
/// Internal add/remove routine.
/// </summary>
/// <param name="val">Value.</param>
/// <param name="cnt">Items count.</param>
/// <returns>Task for the current batch.</returns>
private Task Add0(object val, int cnt)
{
int bufSndSize0 = _bufSndSize;
Debug.Assert(bufSndSize0 > 0);
while (true)
{
var batch0 = _batch;
if (batch0 == null)
throw new InvalidOperationException("Data streamer is stopped.");
int size = batch0.Add(val, cnt);
if (size == -1)
{
// Batch is blocked, perform CAS.
Interlocked.CompareExchange(ref _batch,
new DataStreamerBatch<TK, TV>(batch0), batch0);
continue;
}
if (size >= bufSndSize0)
// Batch is too big, schedule flush.
Flush0(batch0, false, PlcContinue);
return batch0.Task;
}
}
/// <summary>
/// Internal flush routine.
/// </summary>
/// <param name="curBatch"></param>
/// <param name="wait">Whether to wait for flush to complete.</param>
/// <param name="plc">Whether this is the last batch.</param>
/// <returns>Whether this call was able to CAS previous batch</returns>
private bool Flush0(DataStreamerBatch<TK, TV> curBatch, bool wait, int plc)
{
// 1. Try setting new current batch to help further adders.
bool res = Interlocked.CompareExchange(ref _batch,
(plc == PlcContinue || plc == PlcFlush) ?
new DataStreamerBatch<TK, TV>(curBatch) : null, curBatch) == curBatch;
// 2. Perform actual send.
Debug.Assert(curBatch != null, "curBatch != null");
curBatch.Send(this, plc);
if (wait)
// 3. Wait for all futures to finish.
curBatch.GetThisAndPreviousCompletionTask().Wait();
return res;
}
/// <summary>
/// Start write.
/// </summary>
/// <returns>Writer.</returns>
internal void Update(Action<BinaryWriter> action)
{
_rwLock.EnterReadLock();
try
{
ThrowIfDisposed();
DoOutOp(OpUpdate, action);
}
finally
{
_rwLock.ExitReadLock();
}
}
/// <summary>
/// Flusher.
/// </summary>
private class Flusher<TK1, TV1>
{
/** State: running. */
private const int StateRunning = 0;
/** State: stopping. */
private const int StateStopping = 1;
/** State: stopped. */
private const int StateStopped = 2;
/** Data streamer. */
[SuppressMessage("Microsoft.Performance", "CA1823:AvoidUnusedPrivateFields",
Justification = "Incorrect warning")]
private readonly WeakReference _ldrRef;
/** Finish flag. */
[SuppressMessage("Microsoft.Performance", "CA1823:AvoidUnusedPrivateFields",
Justification = "Incorrect warning")]
private int _state;
/** Flush frequency. */
[SuppressMessage("Microsoft.Performance", "CA1823:AvoidUnusedPrivateFields",
Justification = "Incorrect warning")]
private long _freq;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="ldrRef">Data streamer weak reference..</param>
public Flusher(WeakReference ldrRef)
{
_ldrRef = ldrRef;
lock (this)
{
_state = StateRunning;
}
}
/// <summary>
/// Main flusher routine.
/// </summary>
private void Run()
{
bool force = false;
long curFreq = 0;
try
{
while (true)
{
if (curFreq > 0 || force)
{
var ldr = _ldrRef.Target as DataStreamerImpl<TK1, TV1>;
if (ldr == null)
return;
ldr.TryFlush();
force = false;
}
lock (this)
{
// Stop immediately.
if (_state == StateStopping)
return;
if (curFreq == _freq)
{
// Frequency is unchanged
if (curFreq == 0)
// Just wait for a second and re-try.
Monitor.Wait(this, 1000);
else
{
// Calculate remaining time.
DateTime now = DateTime.Now;
long ticks;
try
{
ticks = now.AddMilliseconds(curFreq).Ticks - now.Ticks;
if (ticks > int.MaxValue)
ticks = int.MaxValue;
}
catch (ArgumentOutOfRangeException)
{
// Handle possible overflow.
ticks = int.MaxValue;
}
Monitor.Wait(this, TimeSpan.FromTicks(ticks));
}
}
else
{
if (curFreq != 0)
force = true;
curFreq = _freq;
}
}
}
}
finally
{
// Let streamer know about stop.
lock (this)
{
_state = StateStopped;
Monitor.PulseAll(this);
}
}
}
/// <summary>
/// Frequency.
/// </summary>
public long Frequency
{
get
{
return Interlocked.Read(ref _freq);
}
set
{
lock (this)
{
if (_freq != value)
{
_freq = value;
Monitor.PulseAll(this);
}
}
}
}
/// <summary>
/// Stop flusher.
/// </summary>
public void Stop()
{
lock (this)
{
if (_state == StateRunning)
{
_state = StateStopping;
Monitor.PulseAll(this);
}
while (_state != StateStopped)
Monitor.Wait(this);
}
}
/// <summary>
/// Runs the flusher thread.
/// </summary>
public void RunThread()
{
TaskRunner.Run(Run);
}
}
#pragma warning restore 0420
}
}