blob: a40938e901dbaa1d8af2b893891aba88881a456d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Internal.Sql
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Buffers;
using Common;
using Ignite.Sql;
using Proto;
using Proto.BinaryTuple;
using Proto.MsgPack;
/// <summary>
/// SQL result set.
/// </summary>
/// <typeparam name="T">Result type.</typeparam>
internal sealed class ResultSet<T> : IResultSet<T>
{
private readonly ClientSocket _socket;
private readonly long? _resourceId;
private readonly PooledBuffer? _buffer;
private readonly bool _hasMorePages;
private readonly RowReader<T>? _rowReader;
private readonly CancellationToken _cancellationToken;
private bool _resourceClosed;
private int _bufferReleased;
private bool _iterated;
/// <summary>
/// Initializes a new instance of the <see cref="ResultSet{T}"/> class.
/// </summary>
/// <param name="socket">Socket.</param>
/// <param name="buf">Buffer to read initial data from.</param>
/// <param name="rowReaderFactory">Row reader factory.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public ResultSet(ClientSocket socket, PooledBuffer buf, RowReaderFactory<T> rowReaderFactory, CancellationToken cancellationToken)
{
_socket = socket;
_cancellationToken = cancellationToken;
var reader = buf.GetReader();
// ReSharper disable once RedundantCast (required on .NET Core 3.1).
_resourceId = reader.TryReadNil() ? (long?)null : reader.ReadInt64();
HasRowSet = reader.ReadBoolean();
_hasMorePages = reader.ReadBoolean();
WasApplied = reader.ReadBoolean();
AffectedRows = reader.ReadInt64();
Metadata = HasRowSet ? ReadMeta(ref reader) : null;
_rowReader = Metadata != null ? rowReaderFactory(Metadata.Columns) : null;
if (HasRowSet)
{
buf.Position += reader.Consumed;
_buffer = buf;
HasRows = reader.ReadInt32() > 0;
}
else
{
buf.Dispose();
_bufferReleased = 1;
_resourceClosed = true;
}
}
/// <summary>
/// Finalizes an instance of the <see cref="ResultSet{T}"/> class.
/// </summary>
~ResultSet()
{
Dispose();
}
/// <inheritdoc/>
public IResultSetMetadata? Metadata { get; }
/// <inheritdoc/>
public bool HasRowSet { get; }
/// <inheritdoc/>
public long AffectedRows { get; }
/// <inheritdoc/>
public bool WasApplied { get; }
/// <summary>
/// Gets a value indicating whether this instance is disposed.
/// </summary>
internal bool IsDisposed => (_resourceId == null || _resourceClosed) && _bufferReleased > 0;
/// <summary>
/// Gets a value indicating whether this result set has any rows in it.
/// </summary>
internal bool HasRows { get; }
/// <inheritdoc/>
public async ValueTask<List<T>> ToListAsync() =>
await CollectAsync(
constructor: static capacity => new List<T>(capacity),
accumulator: static (list, item) => list.Add(item))
.ConfigureAwait(false);
/// <inheritdoc/>
[SuppressMessage("ReSharper", "InconsistentNaming", Justification = "Generics.")]
public async ValueTask<Dictionary<TK, TV>> ToDictionaryAsync<TK, TV>(
Func<T, TK> keySelector,
Func<T, TV> valSelector,
IEqualityComparer<TK>? comparer)
where TK : notnull
{
IgniteArgumentCheck.NotNull(keySelector);
IgniteArgumentCheck.NotNull(valSelector);
return await CollectAsync(
constructor: capacity => new Dictionary<TK, TV>(capacity, comparer),
accumulator: (dictionary, item) => dictionary.Add(keySelector(item), valSelector(item)))
.ConfigureAwait(false);
}
/// <inheritdoc/>
public async ValueTask<TResult> CollectAsync<TResult>(Func<int, TResult> constructor, Action<TResult, T> accumulator)
{
IgniteArgumentCheck.NotNull(constructor);
IgniteArgumentCheck.NotNull(accumulator);
ValidateAndSetIteratorState();
// First page is included in the initial response.
var cols = Metadata!.Columns;
var hasMore = _hasMorePages;
TResult? res = default;
ReadPage(_buffer!);
ReleaseBuffer();
while (hasMore)
{
using var pageBuf = await FetchNextPage().ConfigureAwait(false);
ReadPage(pageBuf);
}
_resourceClosed = true;
return res!;
void ReadPage(PooledBuffer buf)
{
var reader = buf.GetReader();
var pageSize = reader.ReadInt32();
var capacity = hasMore ? pageSize * 2 : pageSize;
res ??= constructor(capacity);
for (var rowIdx = 0; rowIdx < pageSize; rowIdx++)
{
var row = ReadRow(cols, ref reader);
accumulator(res, row);
}
if (!reader.End)
{
hasMore = reader.ReadBoolean();
}
}
}
/// <inheritdoc/>
[SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly", Justification = "SuppressFinalize in DisposeAsync")]
public void Dispose()
{
DisposeAsync().AsTask().GetAwaiter().GetResult();
}
/// <inheritdoc/>
[SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Dispose should not throw.")]
public async ValueTask DisposeAsync()
{
ReleaseBuffer();
if (_resourceId != null && !_resourceClosed)
{
try
{
using var writer = ProtoCommon.GetMessageWriter();
WriteId(writer.MessageWriter);
// Cursor close should never be cancelled.
using var buffer = await _socket.DoOutInOpAsync(
ClientOp.SqlCursorClose, writer, cancellationToken: CancellationToken.None).ConfigureAwait(false);
}
catch (Exception)
{
// Ignore.
// Socket might be disconnected.
}
_resourceClosed = true;
}
GC.SuppressFinalize(this);
}
/// <inheritdoc/>
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
ValidateAndSetIteratorState();
return EnumerateRows().GetAsyncEnumerator(cancellationToken);
}
/// <inheritdoc/>
public override string ToString() =>
new IgniteToStringBuilder(GetType())
.Append(HasRowSet)
.Append(AffectedRows)
.Append(WasApplied)
.Append(Metadata)
.Build();
/// <summary>
/// Enumerates ResultSet pages.
/// </summary>
/// <returns>ResultSet pages.</returns>
internal async IAsyncEnumerable<PooledBuffer> EnumeratePagesInternal()
{
ValidateAndSetIteratorState();
yield return _buffer!;
ReleaseBuffer();
if (!_hasMorePages)
{
yield break;
}
while (true)
{
using var buffer = await FetchNextPage().ConfigureAwait(false);
yield return buffer;
if (!HasMore(buffer))
{
break;
}
}
static bool HasMore(PooledBuffer buf)
{
var reader = buf.GetReader();
var rowCount = reader.ReadInt32();
reader.Skip(rowCount);
return !reader.End && reader.ReadBoolean();
}
}
private static ResultSetMetadata ReadMeta(ref MsgPackReader reader)
{
var size = reader.ReadInt32();
var columns = new List<IColumnMetadata>(size);
for (int i = 0; i < size; i++)
{
var propertyCount = reader.ReadInt32();
const int minCount = 6;
Debug.Assert(propertyCount >= minCount, "propertyCount >= " + minCount);
var name = reader.ReadString();
var nullable = reader.ReadBoolean();
var type = (ColumnType)reader.ReadInt32();
var scale = reader.ReadInt32();
var precision = reader.ReadInt32();
var origin = reader.ReadBoolean()
? new ColumnOrigin(
ColumnName: reader.TryReadNil() ? name : reader.ReadString(),
SchemaName: reader.TryReadInt(out var idx) ? columns[idx].Origin!.SchemaName : reader.ReadString(),
TableName: reader.TryReadInt(out idx) ? columns[idx].Origin!.TableName : reader.ReadString())
: null;
columns.Add(new ColumnMetadata(name, type, precision, scale, nullable, origin));
}
return new ResultSetMetadata(columns);
}
private T ReadRow(IReadOnlyList<IColumnMetadata> cols, ref MsgPackReader reader)
{
var tupleReader = new BinaryTupleReader(reader.ReadBinary(), cols.Count);
return _rowReader!(cols, ref tupleReader);
}
private async IAsyncEnumerable<T> EnumerateRows()
{
var hasMore = _hasMorePages;
var cols = Metadata!.Columns;
var offset = 0;
// First page.
foreach (var row in EnumeratePage(_buffer!))
{
yield return row;
}
ReleaseBuffer();
// Next pages.
while (hasMore)
{
using var buffer = await FetchNextPage().ConfigureAwait(false);
offset = 0;
foreach (var row in EnumeratePage(buffer))
{
yield return row;
}
}
_resourceClosed = true;
IEnumerable<T> EnumeratePage(PooledBuffer buf)
{
// ReSharper disable AccessToModifiedClosure
var reader = buf.GetReader(offset);
var pageSize = reader.ReadInt32();
offset += reader.Consumed;
for (var rowIdx = 0; rowIdx < pageSize; rowIdx++)
{
_cancellationToken.ThrowIfCancellationRequested();
// Can't use ref struct reader from above inside iterator block (CS4013).
// Use a new reader for every row (stack allocated).
var rowReader = buf.GetReader(offset);
var row = ReadRow(cols, ref rowReader);
offset += rowReader.Consumed;
yield return row;
}
reader = buf.GetReader(offset);
if (!reader.End)
{
hasMore = reader.ReadBoolean();
}
}
}
private async Task<PooledBuffer> FetchNextPage()
{
using var writer = ProtoCommon.GetMessageWriter();
WriteId(writer.MessageWriter);
return await _socket.DoOutInOpAsync(ClientOp.SqlCursorNextPage, writer, cancellationToken: _cancellationToken)
.ConfigureAwait(false);
}
private void WriteId(MsgPackWriter writer)
{
var resourceId = _resourceId;
Debug.Assert(resourceId != null, "resourceId != null");
ObjectDisposedException.ThrowIf(_resourceClosed, this);
writer.Write(_resourceId!.Value);
}
private void ValidateAndSetIteratorState()
{
if (!HasRowSet)
{
throw new IgniteClientException(ErrorGroups.Sql.QueryNoResultSet, "Query has no result set.");
}
if (_iterated)
{
throw new IgniteClientException(
ErrorGroups.Common.CursorAlreadyClosed,
"Query result set can not be iterated more than once.");
}
_iterated = true;
}
private void ReleaseBuffer()
{
// ResultSet is not thread safe, so we don't need Interlocked with correct usage.
// However, double release of pooled buffers is very dangerous, so we protect against that anyway.
if (Interlocked.CompareExchange(ref _bufferReleased, 1, 0) == 0)
{
_buffer?.Dispose();
}
}
}
}