blob: 9b21a8a547dedc3a909a0363b684267148cf20d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Internal.Sql
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Buffers;
using Common;
using Ignite.Sql;
using Ignite.Table;
using Ignite.Transactions;
using Linq;
using Proto;
using Proto.BinaryTuple;
using Proto.MsgPack;
using Transactions;
/// <summary>
/// SQL API.
/// </summary>
internal sealed class Sql : ISql
{
private static readonly RowReader<IIgniteTuple> TupleReader =
static (IReadOnlyList<IColumnMetadata> cols, ref BinaryTupleReader reader) => ReadTuple(cols, ref reader);
private static readonly RowReaderFactory<IIgniteTuple> TupleReaderFactory = static _ => TupleReader;
/** Underlying connection. */
private readonly ClientFailoverSocket _socket;
/// <summary>
/// Initializes a new instance of the <see cref="Sql"/> class.
/// </summary>
/// <param name="socket">Socket.</param>
public Sql(ClientFailoverSocket socket)
{
_socket = socket;
}
/// <inheritdoc/>
public async Task<IResultSet<IIgniteTuple>> ExecuteAsync(ITransaction? transaction, SqlStatement statement, params object?[]? args) =>
await ExecuteAsyncInternal(transaction, statement, TupleReaderFactory, args).ConfigureAwait(false);
/// <inheritdoc/>
public async Task<IResultSet<T>> ExecuteAsync<T>(ITransaction? transaction, SqlStatement statement, params object?[]? args) =>
await ExecuteAsyncInternal(transaction, statement, static cols => GetReaderFactory<T>(cols), args)
.ConfigureAwait(false);
/// <inheritdoc/>
public async Task<IgniteDbDataReader> ExecuteReaderAsync(ITransaction? transaction, SqlStatement statement, params object?[]? args)
{
var resultSet = await ExecuteAsyncInternal<object>(transaction, statement, _ => null!, args).ConfigureAwait(false);
if (!resultSet.HasRowSet)
{
throw new InvalidOperationException($"{nameof(ExecuteReaderAsync)} does not support queries without row set (DDL, DML).");
}
return new IgniteDbDataReader(resultSet);
}
/// <inheritdoc/>
public async Task ExecuteScriptAsync(SqlStatement script, params object?[]? args)
{
IgniteArgumentCheck.NotNull(script);
using var bufferWriter = ProtoCommon.GetMessageWriter();
WriteStatement(bufferWriter, script, args);
try
{
using var buf = await _socket.DoOutInOpAsync(ClientOp.SqlExecScript, bufferWriter).ConfigureAwait(false);
}
catch (SqlException e) when (e.Code == ErrorGroups.Sql.StmtParse)
{
throw new SqlException(
e.TraceId,
ErrorGroups.Sql.StmtValidation,
"Invalid query, check inner exceptions for details: " + script,
e);
}
}
/// <inheritdoc/>
public override string ToString() => IgniteToStringBuilder.Build(GetType());
/// <summary>
/// Reads column value.
/// </summary>
/// <param name="reader">Reader.</param>
/// <param name="col">Column.</param>
/// <param name="idx">Index.</param>
/// <returns>Value.</returns>
internal static object? ReadColumnValue(ref BinaryTupleReader reader, IColumnMetadata col, int idx)
{
if (reader.IsNull(idx))
{
return null;
}
return col.Type switch
{
ColumnType.Null => null,
ColumnType.Boolean => reader.GetBool(idx),
ColumnType.Int8 => reader.GetByte(idx),
ColumnType.Int16 => reader.GetShort(idx),
ColumnType.Int32 => reader.GetInt(idx),
ColumnType.Int64 => reader.GetLong(idx),
ColumnType.Float => reader.GetFloat(idx),
ColumnType.Double => reader.GetDouble(idx),
ColumnType.Decimal => reader.GetDecimal(idx, col.Scale),
ColumnType.Date => reader.GetDate(idx),
ColumnType.Time => reader.GetTime(idx),
ColumnType.Datetime => reader.GetDateTime(idx),
ColumnType.Timestamp => reader.GetTimestamp(idx),
ColumnType.Uuid => reader.GetGuid(idx),
ColumnType.Bitmask => reader.GetBitmask(idx),
ColumnType.String => reader.GetString(idx),
ColumnType.ByteArray => reader.GetBytes(idx),
ColumnType.Period => reader.GetPeriod(idx),
ColumnType.Duration => reader.GetDuration(idx),
ColumnType.Number => reader.GetNumber(idx),
_ => throw new ArgumentOutOfRangeException(nameof(col.Type), col.Type, "Unknown SQL column type.")
};
}
/// <summary>
/// Executes single SQL statement and returns rows deserialized with the provided <paramref name="rowReaderFactory"/>.
/// </summary>
/// <param name="transaction">Optional transaction.</param>
/// <param name="statement">Statement to execute.</param>
/// <param name="rowReaderFactory">Row reader factory.</param>
/// <param name="args">Arguments for the statement.</param>
/// <typeparam name="T">Row type.</typeparam>
/// <returns>SQL result set.</returns>
internal async Task<ResultSet<T>> ExecuteAsyncInternal<T>(
ITransaction? transaction,
SqlStatement statement,
RowReaderFactory<T> rowReaderFactory,
ICollection<object?>? args)
{
IgniteArgumentCheck.NotNull(statement);
Transaction? tx = await LazyTransaction.EnsureStartedAsync(transaction, _socket, default).ConfigureAwait(false);
using var bufferWriter = ProtoCommon.GetMessageWriter();
WriteStatement(bufferWriter, statement, args, tx, writeTx: true);
PooledBuffer? buf = null;
try
{
(buf, var socket) = await _socket.DoOutInOpAndGetSocketAsync(ClientOp.SqlExec, tx, bufferWriter).ConfigureAwait(false);
// ResultSet will dispose the pooled buffer.
return new ResultSet<T>(socket, buf, rowReaderFactory);
}
catch (SqlException e) when (e.Code == ErrorGroups.Sql.StmtParse)
{
buf?.Dispose();
throw new SqlException(
e.TraceId,
ErrorGroups.Sql.StmtValidation,
"Invalid query, check inner exceptions for details: " + statement,
e);
}
catch (Exception)
{
buf?.Dispose();
throw;
}
}
private static void WriteProperties(SqlStatement statement, ref MsgPackWriter w)
{
var props = statement.Properties;
w.Write(props.Count);
using var propTuple = new BinaryTupleBuilder(props.Count * 4);
foreach (var (key, val) in props)
{
propTuple.AppendString(key);
propTuple.AppendObjectWithType(val);
}
w.Write(propTuple.Build().Span);
}
private static IIgniteTuple ReadTuple(IReadOnlyList<IColumnMetadata> cols, ref BinaryTupleReader tupleReader)
{
var row = new IgniteTuple(cols.Count);
for (var i = 0; i < cols.Count; i++)
{
var col = cols[i];
row[col.Name] = ReadColumnValue(ref tupleReader, col, i);
}
return row;
}
private static RowReader<T> GetReaderFactory<T>(IReadOnlyList<IColumnMetadata> cols) =>
ResultSelector.Get<T>(cols, selectorExpression: null, ResultSelectorOptions.None);
private void WriteStatement(
PooledArrayBuffer writer,
SqlStatement statement,
ICollection<object?>? args,
Transaction? tx = null,
bool writeTx = false)
{
var w = writer.MessageWriter;
if (writeTx)
{
w.WriteTx(tx?.Id);
}
w.Write(statement.Schema);
w.Write(statement.PageSize);
w.Write((long)statement.Timeout.TotalMilliseconds);
w.WriteNil(); // Session timeout (unused, session is closed by the server immediately).
WriteProperties(statement, ref w);
w.Write(statement.Query);
w.WriteObjectCollectionAsBinaryTuple(args);
w.Write(_socket.ObservableTimestamp);
}
}
}