blob: 431b1e8722717b52093b0b1c4c95f99c56576bc9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Impl.Client.Datastream
{
using System;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Impl.Common;
/// <summary>
/// Client data streamer buffer.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable",
Justification = "WaitHandle is not used in ReaderWriterLockSlim, no need to dispose.")]
internal sealed class DataStreamerClientBuffer<TK, TV>
{
/** Array vs concurrent data structures:
* - Can be written in parallel, since index is from Interlocked.Increment, no additional synchronization needed
* - Compact storage (contiguous memory block)
* - Less allocations
* - Easy pooling
*/
private readonly DataStreamerClientEntry<TK, TV>[] _entries;
/** */
private TaskCompletionSource<object> _flushCompletionSource;
/** */
private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
/** */
private readonly DataStreamerClientPerNodeBuffer<TK,TV> _parent;
/** Previous buffer in the chain. Buffer chain allows us to track previous flushes. */
private DataStreamerClientBuffer<TK,TV> _previous;
/** */
private long _size;
/** */
private volatile bool _flushing;
/** */
private volatile bool _flushed;
/// <summary>
/// Initializes a new instance of <see cref="DataStreamerClientBuffer{TK,TV}"/>.
/// </summary>
public DataStreamerClientBuffer(
DataStreamerClientEntry<TK,TV>[] entries,
DataStreamerClientPerNodeBuffer<TK, TV> parent,
DataStreamerClientBuffer<TK, TV> previous)
{
Debug.Assert(parent != null);
_entries = entries;
_parent = parent;
_previous = previous;
}
/// <summary>
/// Gets the entry count.
/// </summary>
public int Count
{
// Size can exceed entries length when multiple threads compete on Add.
get { return _size > _entries.Length ? _entries.Length : (int) _size; }
}
/// <summary>
/// Gets the flush task for this and all previous buffers.
/// </summary>
public Task GetChainFlushTask()
{
_rwLock.EnterWriteLock();
try
{
if (CheckChainFlushed())
{
return TaskRunner.CompletedTask;
}
var previous = _previous;
if (_flushed)
{
return previous == null
? TaskRunner.CompletedTask
: previous.GetChainFlushTask();
}
if (_flushCompletionSource == null)
{
_flushCompletionSource = new TaskCompletionSource<object>();
}
return previous == null
? _flushCompletionSource.Task
: TaskRunner.WhenAll(new[] {previous.GetChainFlushTask(), _flushCompletionSource.Task});
}
finally
{
_rwLock.ExitWriteLock();
}
}
/// <summary>
/// Gets the entries array.
/// </summary>
public DataStreamerClientEntry<TK, TV>[] Entries
{
get { return _entries; }
}
/// <summary>
/// Gets the previous buffer in the chain.
/// </summary>
public DataStreamerClientBuffer<TK, TV> Previous
{
get { return _previous; }
}
/// <summary>
/// Adds an entry to the buffer.
/// </summary>
/// <param name="entry">Entry.</param>
/// <returns>True when added successfully; false otherwise (buffer is full, flushing, flushed, or closed).</returns>
public bool Add(DataStreamerClientEntry<TK, TV> entry)
{
if (!_rwLock.TryEnterReadLock(0))
return false;
long newSize;
try
{
if (_flushing || _flushed)
{
return false;
}
newSize = Interlocked.Increment(ref _size);
if (newSize > _entries.Length)
{
return false;
}
_entries[newSize - 1] = entry;
}
finally
{
_rwLock.ExitReadLock();
}
if (newSize == _entries.Length)
{
TryStartFlush();
}
return true;
}
/// <summary>
/// Initiates the flush operation.
/// </summary>
public void ScheduleFlush()
{
if (Interlocked.CompareExchange(ref _size, -1, -1) == 0)
{
// Empty buffer.
return;
}
TryStartFlush();
}
/// <summary>
/// Marks this buffer as flushed.
/// </summary>
public bool MarkFlushed()
{
_rwLock.EnterWriteLock();
try
{
if (_flushing || _flushed)
{
return false;
}
_flushed = true;
return true;
}
finally
{
_rwLock.ExitWriteLock();
}
}
/** <inheritdoc /> */
public override string ToString()
{
return string.Format("{0} [Count={1}, Flushing={2}, Flushed={3}, Previous={4}]", GetType().Name, Count,
_flushing, _flushed, _previous);
}
/// <summary>
/// Attempts to start the flush operation.
/// </summary>
private void TryStartFlush()
{
_rwLock.EnterWriteLock();
try
{
if (_flushing || _flushed)
{
return;
}
_flushing = true;
if (Count > 0)
{
StartFlush();
}
else
{
OnFlushed();
}
}
finally
{
_rwLock.ExitWriteLock();
}
}
/// <summary>
/// Starts the flush operation.
/// </summary>
private void StartFlush()
{
// NOTE: Continuation runs on socket thread - set result on thread pool.
_parent.FlushBufferAsync(this).ContWith(
t => ThreadPool.QueueUserWorkItem(buf =>
((DataStreamerClientBuffer<TK, TV>)buf).OnFlushed(t.Exception), this),
TaskContinuationOptions.ExecuteSynchronously);
}
/// <summary>
/// Called when flush operation completes.
/// </summary>
private void OnFlushed(AggregateException exception = null)
{
TaskCompletionSource<object> tcs;
_rwLock.EnterWriteLock();
try
{
_flushed = true;
_flushing = false;
tcs = _flushCompletionSource;
}
finally
{
_rwLock.ExitWriteLock();
}
CheckChainFlushed();
if (tcs != null)
{
var previous = _previous;
if (previous == null)
{
TrySetResultOrException(tcs, exception);
}
else
{
previous.GetChainFlushTask().ContWith(
t => TrySetResultOrException(tcs, exception ?? t.Exception),
TaskContinuationOptions.ExecuteSynchronously);
}
}
}
/// <summary>
/// Checks if entire buffer chain is already flushed.
/// </summary>
private bool CheckChainFlushed()
{
var previous = _previous;
if (previous == null)
{
return _flushed;
}
if (!previous.CheckChainFlushed())
{
return false;
}
_previous = null;
return _flushed;
}
/// <summary>
/// Sets task completion source status.
/// </summary>
private static void TrySetResultOrException(TaskCompletionSource<object> tcs, Exception exception)
{
if (exception == null)
{
tcs.TrySetResult(null);
}
else
{
tcs.TrySetException(exception);
}
}
}
}