blob: 696a965d6107d923510c578f4a008ca6e18d1d7e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Internal.Sql
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Buffers;
using Common;
using Ignite.Sql;
using Ignite.Table;
using Ignite.Transactions;
using Linq;
using Proto;
using Proto.BinaryTuple;
using Proto.MsgPack;
using Transactions;
/// <summary>
/// SQL API.
/// </summary>
internal sealed class Sql : ISql
{
private static readonly RowReader<IIgniteTuple> TupleReader =
static (IReadOnlyList<IColumnMetadata> cols, ref BinaryTupleReader reader) => ReadTuple(cols, ref reader);
private static readonly RowReaderFactory<IIgniteTuple> TupleReaderFactory = static _ => TupleReader;
/** Underlying connection. */
private readonly ClientFailoverSocket _socket;
/// <summary>
/// Initializes a new instance of the <see cref="Sql"/> class.
/// </summary>
/// <param name="socket">Socket.</param>
public Sql(ClientFailoverSocket socket)
{
_socket = socket;
}
/// <inheritdoc/>
public async Task<IResultSet<IIgniteTuple>> ExecuteAsync(
ITransaction? transaction, SqlStatement statement, CancellationToken cancellationToken, params object?[]? args) =>
await ExecuteAsyncInternal(transaction, statement, TupleReaderFactory, args, cancellationToken).ConfigureAwait(false);
/// <inheritdoc/>
public async Task<IResultSet<T>> ExecuteAsync<T>(
ITransaction? transaction, SqlStatement statement, CancellationToken cancellationToken, params object?[]? args) =>
await ExecuteAsyncInternal(
transaction,
statement,
static cols => GetReaderFactory<T>(cols),
args,
cancellationToken)
.ConfigureAwait(false);
/// <inheritdoc/>
public async Task<IgniteDbDataReader> ExecuteReaderAsync(
ITransaction? transaction, SqlStatement statement, CancellationToken cancellationToken, params object?[]? args)
{
var resultSet = await ExecuteAsyncInternal<object>(
transaction, statement, _ => null!, args, cancellationToken).ConfigureAwait(false);
if (!resultSet.HasRowSet)
{
throw new InvalidOperationException($"{nameof(ExecuteReaderAsync)} does not support queries without row set (DDL, DML).");
}
return new IgniteDbDataReader(resultSet);
}
/// <inheritdoc/>
public async Task ExecuteScriptAsync(SqlStatement script, params object?[]? args)
{
await ExecuteScriptAsync(script, CancellationToken.None, args).ConfigureAwait(false);
}
/// <inheritdoc/>
public async Task ExecuteScriptAsync(SqlStatement script, CancellationToken cancellationToken, params object?[]? args)
{
IgniteArgumentCheck.NotNull(script);
using var bufferWriter = ProtoCommon.GetMessageWriter();
WriteStatement(bufferWriter, script, args);
try
{
using var buf = await _socket.DoOutInOpAsync(
ClientOp.SqlExecScript, bufferWriter, cancellationToken: cancellationToken).ConfigureAwait(false);
}
catch (SqlException e)
{
ConvertExceptionAndThrow(e, script, cancellationToken);
throw;
}
}
/// <inheritdoc/>
public async Task<long[]> ExecuteBatchAsync(
ITransaction? transaction,
SqlStatement statement,
IEnumerable<IEnumerable<object?>> args,
CancellationToken cancellationToken = default)
{
IgniteArgumentCheck.NotNull(statement);
IgniteArgumentCheck.NotNull(args);
cancellationToken.ThrowIfCancellationRequested();
Transaction? tx = await LazyTransaction.EnsureStartedAsync(transaction, _socket, default).ConfigureAwait(false);
using var bufferWriter = ProtoCommon.GetMessageWriter();
WriteStatement(bufferWriter, statement, tx, writeTx: true);
WriteBatchArgs(bufferWriter, args);
bufferWriter.MessageWriter.Write(_socket.ObservableTimestamp);
try
{
var (buf, _) = await _socket.DoOutInOpAndGetSocketAsync(
ClientOp.SqlExecBatch, tx, bufferWriter, cancellationToken: cancellationToken).ConfigureAwait(false);
using (buf)
{
return Read(buf);
}
}
catch (SqlBatchException e)
{
ConvertExceptionAndThrow(e, statement, cancellationToken);
throw;
}
static long[] Read(PooledBuffer resBuf)
{
var r = resBuf.GetReader();
r.Skip(4); // Unused values: resourceId, rowSet, morePages, wasApplied
int count = r.ReadInt32();
var affectedRows = new long[count];
for (var i = 0; i < count; i++)
{
affectedRows[i] = r.ReadInt64();
}
return affectedRows;
}
}
/// <inheritdoc/>
public override string ToString() => IgniteToStringBuilder.Build(GetType());
/// <summary>
/// Reads column value.
/// </summary>
/// <param name="reader">Reader.</param>
/// <param name="col">Column.</param>
/// <param name="idx">Index.</param>
/// <returns>Value.</returns>
internal static object? ReadColumnValue(ref BinaryTupleReader reader, IColumnMetadata col, int idx)
{
if (reader.IsNull(idx))
{
return null;
}
return col.Type switch
{
ColumnType.Null => null,
ColumnType.Boolean => reader.GetBool(idx),
ColumnType.Int8 => reader.GetByte(idx),
ColumnType.Int16 => reader.GetShort(idx),
ColumnType.Int32 => reader.GetInt(idx),
ColumnType.Int64 => reader.GetLong(idx),
ColumnType.Float => reader.GetFloat(idx),
ColumnType.Double => reader.GetDouble(idx),
ColumnType.Decimal => reader.GetBigDecimal(idx, col.Scale),
ColumnType.Date => reader.GetDate(idx),
ColumnType.Time => reader.GetTime(idx),
ColumnType.Datetime => reader.GetDateTime(idx),
ColumnType.Timestamp => reader.GetTimestamp(idx),
ColumnType.Uuid => reader.GetGuid(idx),
ColumnType.String => reader.GetString(idx),
ColumnType.ByteArray => reader.GetBytes(idx),
ColumnType.Period => reader.GetPeriod(idx),
ColumnType.Duration => reader.GetDuration(idx),
_ => throw new InvalidOperationException("Unknown SQL column type: " + col.Type)
};
}
/// <summary>
/// Executes single SQL statement and returns rows deserialized with the provided <paramref name="rowReaderFactory"/>.
/// </summary>
/// <param name="transaction">Optional transaction.</param>
/// <param name="statement">Statement to execute.</param>
/// <param name="rowReaderFactory">Row reader factory.</param>
/// <param name="args">Arguments for the statement.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <typeparam name="T">Row type.</typeparam>
/// <returns>SQL result set.</returns>
internal async Task<ResultSet<T>> ExecuteAsyncInternal<T>(
ITransaction? transaction,
SqlStatement statement,
RowReaderFactory<T> rowReaderFactory,
ICollection<object?>? args,
CancellationToken cancellationToken)
{
IgniteArgumentCheck.NotNull(statement);
cancellationToken.ThrowIfCancellationRequested();
Transaction? tx = await LazyTransaction.EnsureStartedAsync(transaction, _socket, default).ConfigureAwait(false);
using var bufferWriter = ProtoCommon.GetMessageWriter();
WriteStatement(bufferWriter, statement, args, tx, writeTx: true);
PooledBuffer? buf = null;
try
{
(buf, var socket) = await _socket.DoOutInOpAndGetSocketAsync(
ClientOp.SqlExec, tx, bufferWriter, cancellationToken: cancellationToken).ConfigureAwait(false);
// ResultSet will dispose the pooled buffer.
return new ResultSet<T>(socket, buf, rowReaderFactory, cancellationToken);
}
catch (SqlException e)
{
buf?.Dispose();
ConvertExceptionAndThrow(e, statement, cancellationToken);
throw;
}
catch (Exception)
{
buf?.Dispose();
throw;
}
}
private static void ConvertExceptionAndThrow(IgniteException e, SqlStatement statement, CancellationToken token)
{
switch (e.Code)
{
case ErrorGroups.Sql.StmtParse:
throw new SqlException(
e.TraceId,
ErrorGroups.Sql.StmtValidation,
"Invalid query, check inner exceptions for details: " + statement,
e);
case ErrorGroups.Sql.ExecutionCancelled:
var cancelledToken = token.IsCancellationRequested ? token : CancellationToken.None;
throw new OperationCanceledException(e.Message, e, cancelledToken);
}
}
private static void WriteProperties(SqlStatement statement, ref MsgPackWriter w)
{
var props = statement.Properties;
w.Write(props.Count);
using var propTuple = new BinaryTupleBuilder(props.Count * 4);
foreach (var (key, val) in props)
{
propTuple.AppendString(key);
propTuple.AppendObjectWithType(val);
}
w.Write(propTuple.Build().Span);
}
private static IgniteTuple ReadTuple(IReadOnlyList<IColumnMetadata> cols, ref BinaryTupleReader tupleReader)
{
var row = new IgniteTuple(cols.Count);
for (var i = 0; i < cols.Count; i++)
{
var col = cols[i];
row[col.Name] = ReadColumnValue(ref tupleReader, col, i);
}
return row;
}
private static RowReader<T> GetReaderFactory<T>(IReadOnlyList<IColumnMetadata> cols) =>
ResultSelector.Get<T>(cols, selectorExpression: null, ResultSelectorOptions.None);
private static void WriteBatchArgs(PooledArrayBuffer writer, IEnumerable<IEnumerable<object?>> args)
{
int rowSize = -1;
int rowCountPos = -1;
int rowCount = 0;
var w = writer.MessageWriter;
foreach (var arg in args)
{
IgniteArgumentCheck.NotNull(arg);
IEnumerable<object?> row = arg;
rowCount++;
if (rowSize < 0)
{
// First row, write header.
if (!row.TryGetNonEnumeratedCount(out rowSize))
{
var list = row.ToList();
rowSize = list.Count;
row = list;
}
IgniteArgumentCheck.Ensure(rowSize > 0, nameof(args), "Batch arguments must not contain empty rows.");
w.Write(rowSize);
rowCountPos = writer.ReserveMsgPackInt32();
w.Write(false); // Paged args.
}
w.WriteObjectEnumerableAsBinaryTuple(row, expectedCount: rowSize, errorPrefix: "Inconsistent batch argument size: ");
}
IgniteArgumentCheck.Ensure(rowCount > 0, nameof(args), "Batch arguments must not be empty.");
writer.WriteMsgPackInt32(rowCount, rowCountPos);
}
private static void WriteStatement(
PooledArrayBuffer writer,
SqlStatement statement,
Transaction? tx = null,
bool writeTx = false)
{
var w = writer.MessageWriter;
if (writeTx)
{
w.WriteTx(tx?.Id);
}
w.Write(statement.Schema);
w.Write(statement.PageSize);
w.Write((long)statement.Timeout.TotalMilliseconds);
w.WriteNil(); // Session timeout (unused, session is closed by the server immediately).
w.Write(statement.TimeZoneId);
WriteProperties(statement, ref w);
w.Write(statement.Query);
}
private void WriteStatement(
PooledArrayBuffer writer,
SqlStatement statement,
ICollection<object?>? args,
Transaction? tx = null,
bool writeTx = false)
{
var w = writer.MessageWriter;
WriteStatement(writer, statement, tx, writeTx);
w.WriteObjectCollectionWithCountAsBinaryTuple(args);
w.Write(_socket.ObservableTimestamp);
}
}
}