blob: 98e4b47f1ede895bfbfb28ffae81a73c7264a131 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Flight.Client;
using Arrow.Flight.Protocol.Sql;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
namespace Apache.Arrow.Flight.Sql.Client;
public class FlightSqlClient
{
private readonly FlightClient _client;
public FlightSqlClient(FlightClient client)
{
_client = client;
}
/// <summary>
/// Execute a SQL query on the server.
/// </summary>
/// <param name="query">The UTF8-encoded SQL query to be executed.</param>
/// <param name="transaction">A transaction to associate this query with.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> ExecuteAsync(
string query,
Transaction transaction = default,
FlightCallOptions? options = null,
CancellationToken cancellationToken = default)
{
if (transaction == default)
{
transaction = Transaction.NoTransaction;
}
if (string.IsNullOrEmpty(query))
{
throw new ArgumentException($"Query cannot be null or empty: {nameof(query)}");
}
try
{
var commandQuery = new CommandStatementQuery { Query = query };
if (transaction.IsValid)
{
commandQuery.TransactionId = transaction.TransactionId;
}
var descriptor = FlightDescriptor.CreateCommandDescriptor(commandQuery.PackAndSerialize());
return await GetFlightInfoAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to execute query", ex);
}
}
/// <summary>
/// Executes an update SQL command and returns the number of affected rows.
/// </summary>
/// <param name="query">The UTF8-encoded SQL query to be executed.</param>
/// <param name="transaction">A transaction to associate this query with. Defaults to no transaction if not provided.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>The number of rows affected by the operation.</returns>
public async Task<long> ExecuteUpdateAsync(
string query,
Transaction transaction = default,
FlightCallOptions? options = null,
CancellationToken cancellationToken = default)
{
if (transaction == default)
{
transaction = Transaction.NoTransaction;
}
if (string.IsNullOrEmpty(query))
{
throw new ArgumentException("Query cannot be null or empty", nameof(query));
}
try
{
var updateRequestCommand =
new ActionCreatePreparedStatementRequest { Query = query, TransactionId = transaction.TransactionId };
byte[] serializedUpdateRequestCommand = updateRequestCommand.PackAndSerialize();
var action = new FlightAction(SqlAction.CreateRequest, serializedUpdateRequestCommand);
var call = DoActionAsync(action, options, cancellationToken);
long affectedRows = 0;
await foreach (var result in call.ConfigureAwait(false))
{
var preparedStatementResponse = result.Body.ParseAndUnpack<ActionCreatePreparedStatementResult>();
var command = new CommandPreparedStatementQuery
{
PreparedStatementHandle = preparedStatementResponse.PreparedStatementHandle
};
var descriptor = FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize());
var flightInfo = await GetFlightInfoAsync(descriptor, options, cancellationToken);
var doGetResult = DoGetAsync(flightInfo.Endpoints[0].Ticket, options, cancellationToken);
await foreach (var recordBatch in doGetResult.ConfigureAwait(false))
{
affectedRows += recordBatch.ExtractRowCount();
}
}
return affectedRows;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to execute update query", ex);
}
}
/// <summary>
/// Asynchronously retrieves flight information for a given flight descriptor.
/// </summary>
/// <param name="descriptor">The descriptor of the dataset request, whether a named dataset or a command.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>A task that represents the asynchronous operation. The task result contains the FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> GetFlightInfoAsync(
FlightDescriptor descriptor,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
if (descriptor is null)
{
throw new ArgumentNullException(nameof(descriptor));
}
try
{
var flightInfoCall = _client.GetInfo(descriptor, options?.Headers, null, cancellationToken);
var flightInfo = await flightInfoCall.ResponseAsync.ConfigureAwait(false);
return flightInfo;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get flight info", ex);
}
}
/// <summary>
/// Perform the indicated action, returning an iterator to the stream of results, if any.
/// </summary>
/// <param name="action">The action to be performed</param>
/// <param name="options">Per-RPC options</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>An async enumerable of results</returns>
public async IAsyncEnumerable<FlightResult> DoActionAsync(
FlightAction action,
FlightCallOptions? options = default,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (action is null)
throw new ArgumentNullException(nameof(action));
var call = _client.DoAction(action, options?.Headers, null, cancellationToken);
await foreach (var result in call.ResponseStream.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
yield return result;
}
}
/// <summary>
/// Get the result set schema from the server for the given query.
/// </summary>
/// <param name="query">The UTF8-encoded SQL query</param>
/// <param name="transaction">A transaction to associate this query with</param>
/// <param name="options">Per-RPC options</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>The SchemaResult describing the schema of the result set</returns>
public async Task<Schema> GetExecuteSchemaAsync(
string query, Transaction transaction = default,
FlightCallOptions? options = null,
CancellationToken cancellationToken = default)
{
if (transaction == default)
{
transaction = Transaction.NoTransaction;
}
if (string.IsNullOrEmpty(query))
throw new ArgumentException($"Query cannot be null or empty: {nameof(query)}");
try
{
var prepareStatementRequest =
new ActionCreatePreparedStatementRequest { Query = query, TransactionId = transaction.TransactionId };
var action = new FlightAction(SqlAction.CreateRequest, prepareStatementRequest.PackAndSerialize());
var call = _client.DoAction(action, options?.Headers, null, cancellationToken);
var preparedStatementResponse = await ReadPreparedStatementAsync(call, cancellationToken).ConfigureAwait(false);
if (preparedStatementResponse.PreparedStatementHandle.IsEmpty)
throw new InvalidOperationException("Received an empty or invalid PreparedStatementHandle.");
var commandSqlCall = new CommandPreparedStatementQuery
{
PreparedStatementHandle = preparedStatementResponse.PreparedStatementHandle
};
var descriptor = FlightDescriptor.CreateCommandDescriptor(commandSqlCall.PackAndSerialize());
var schemaResult = await GetFlightInfoAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return schemaResult.Schema;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get execute schema", ex);
}
}
/// <summary>
/// Request a list of catalogs.
/// </summary>
/// <param name="options">RPC-layer hints for this call.</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> GetCatalogsAsync(FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
try
{
var command = new CommandGetCatalogs();
var descriptor = FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize());
var catalogsInfo = await GetFlightInfoAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return catalogsInfo;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get catalogs", ex);
}
}
/// <summary>
/// Get the catalogs schema from the server.
/// </summary>
/// <param name="options">RPC-layer hints for this call.</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>The SchemaResult describing the schema of the catalogs.</returns>
public async Task<Schema> GetCatalogsSchemaAsync(FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
try
{
var commandGetCatalogsSchema = new CommandGetCatalogs();
var descriptor = FlightDescriptor.CreateCommandDescriptor(commandGetCatalogsSchema.PackAndSerialize());
var schemaResult = await GetSchemaAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return schemaResult;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get catalogs schema", ex);
}
}
/// <summary>
/// Asynchronously retrieves schema information for a given flight descriptor.
/// </summary>
/// <param name="descriptor">The descriptor of the dataset request, whether a named dataset or a command.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>A task that represents the asynchronous operation. The task result contains the SchemaResult describing the dataset schema.</returns>
public virtual async Task<Schema> GetSchemaAsync(
FlightDescriptor descriptor,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
if (descriptor is null)
{
throw new ArgumentNullException(nameof(descriptor));
}
try
{
var schemaResultCall = _client.GetSchema(descriptor, options?.Headers, null, cancellationToken);
var schemaResult = await schemaResultCall.ResponseAsync.ConfigureAwait(false);
return schemaResult;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get schema", ex);
}
}
/// <summary>
/// Request a list of database schemas.
/// </summary>
/// <param name="options">RPC-layer hints for this call.</param>
/// <param name="catalog">The catalog.</param>
/// <param name="dbSchemaFilterPattern">The schema filter pattern.</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> GetDbSchemasAsync(
string? catalog = null,
string? dbSchemaFilterPattern = null,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
try
{
var command = new CommandGetDbSchemas();
if (catalog != null)
{
command.Catalog = catalog;
}
if (dbSchemaFilterPattern != null)
{
command.DbSchemaFilterPattern = dbSchemaFilterPattern;
}
byte[] serializedAndPackedCommand = command.PackAndSerialize();
var descriptor = FlightDescriptor.CreateCommandDescriptor(serializedAndPackedCommand);
var flightInfoCall = GetFlightInfoAsync(descriptor, options, cancellationToken);
var flightInfo = await flightInfoCall.ConfigureAwait(false);
return flightInfo;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get database schemas", ex);
}
}
/// <summary>
/// Get the database schemas schema from the server.
/// </summary>
/// <param name="options">RPC-layer hints for this call.</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>The SchemaResult describing the schema of the database schemas.</returns>
public async Task<Schema> GetDbSchemasSchemaAsync(FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
try
{
var command = new CommandGetDbSchemas();
var descriptor = FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize());
var schemaResult = await GetSchemaAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return schemaResult;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get database schemas schema", ex);
}
}
/// <summary>
/// Given a flight ticket and schema, request to be sent the stream. Returns record batch stream reader.
/// </summary>
/// <param name="ticket">The flight ticket to use</param>
/// <param name="options">Per-RPC options</param>
/// <returns>The returned RecordBatchReader</returns>
public async IAsyncEnumerable<RecordBatch> DoGetAsync(
FlightTicket ticket,
FlightCallOptions? options = default,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (ticket == null)
{
throw new ArgumentNullException(nameof(ticket));
}
var call = _client.GetStream(ticket, options?.Headers);
await foreach (var recordBatch in call.ResponseStream.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
yield return recordBatch;
}
}
/// <summary>
/// Upload data to a Flight described by the given descriptor. The caller must call Close() on the returned stream
/// once they are done writing.
/// </summary>
/// <param name="descriptor">The descriptor of the stream.</param>
/// <param name="recordBatch">The record for the data to upload.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>A Task representing the asynchronous operation. The task result contains a DoPutResult struct holding a reader and a writer.</returns>
public async Task<FlightPutResult> DoPutAsync(
FlightDescriptor descriptor,
RecordBatch recordBatch,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
if (descriptor is null)
throw new ArgumentNullException(nameof(descriptor));
if (recordBatch is null)
throw new ArgumentNullException(nameof(recordBatch));
try
{
var doPutResult = _client.StartPut(descriptor, options?.Headers, null, cancellationToken);
var writer = doPutResult.RequestStream;
var reader = doPutResult.ResponseStream;
if (recordBatch == null || recordBatch.Length == 0)
throw new InvalidOperationException("RecordBatch is empty or improperly initialized.");
await writer.WriteAsync(recordBatch).ConfigureAwait(false);
await writer.CompleteAsync().ConfigureAwait(false);
if (await reader.MoveNext(cancellationToken).ConfigureAwait(false))
{
var putResult = reader.Current;
return new FlightPutResult(putResult.ApplicationMetadata);
}
return FlightPutResult.Empty;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to perform DoPut operation", ex);
}
}
/// <summary>
/// Request the primary keys for a table.
/// </summary>
/// <param name="tableRef">The table reference.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> GetPrimaryKeysAsync(TableRef tableRef, FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
if (tableRef == null)
throw new ArgumentNullException(nameof(tableRef));
try
{
var getPrimaryKeysRequest = new CommandGetPrimaryKeys
{
Catalog = tableRef.Catalog ?? string.Empty,
DbSchema = tableRef.DbSchema,
Table = tableRef.Table
};
var descriptor = FlightDescriptor.CreateCommandDescriptor(getPrimaryKeysRequest.PackAndSerialize());
var flightInfo = await GetFlightInfoAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return flightInfo;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get primary keys", ex);
}
}
/// <summary>
/// Request a list of tables.
/// </summary>
/// <param name="catalog">The catalog.</param>
/// <param name="dbSchemaFilterPattern">The schema filter pattern.</param>
/// <param name="tableFilterPattern">The table filter pattern.</param>
/// <param name="includeSchema">True to include the schema upon return, false to not include the schema.</param>
/// <param name="tableTypes">The table types to include.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<IEnumerable<FlightInfo>> GetTablesAsync(
string? catalog = null,
string? dbSchemaFilterPattern = null,
string? tableFilterPattern = null,
bool includeSchema = false,
IEnumerable<string>? tableTypes = null,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
var command = new CommandGetTables
{
Catalog = catalog ?? string.Empty,
DbSchemaFilterPattern = dbSchemaFilterPattern ?? string.Empty,
TableNameFilterPattern = tableFilterPattern ?? string.Empty,
IncludeSchema = includeSchema
};
if (tableTypes != null)
{
command.TableTypes.AddRange(tableTypes);
}
var descriptor = FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize());
var flightInfoCall = GetFlightInfoAsync(descriptor, options, cancellationToken);
var flightInfo = await flightInfoCall.ConfigureAwait(false);
var flightInfos = new List<FlightInfo> { flightInfo };
return flightInfos;
}
/// <summary>
/// Retrieves a description about the foreign key columns that reference the primary key columns of the given table.
/// </summary>
/// <param name="tableRef">The table reference.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> GetExportedKeysAsync(
TableRef tableRef,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
if (tableRef == null)
throw new ArgumentNullException(nameof(tableRef));
try
{
var getExportedKeysRequest = new CommandGetExportedKeys
{
Catalog = tableRef.Catalog ?? string.Empty,
DbSchema = tableRef.DbSchema,
Table = tableRef.Table
};
var descriptor = FlightDescriptor.CreateCommandDescriptor(getExportedKeysRequest.PackAndSerialize());
var flightInfo = await GetFlightInfoAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return flightInfo;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get exported keys", ex);
}
}
/// <summary>
/// Get the exported keys schema from the server.
/// </summary>
/// <param name="tableRef">The table reference.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The SchemaResult describing the schema of the exported keys.</returns>
public async Task<Schema> GetExportedKeysSchemaAsync(
TableRef tableRef,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
try
{
var commandGetExportedKeysSchema = new CommandGetExportedKeys
{
Catalog = tableRef.Catalog ?? string.Empty,
DbSchema = tableRef.DbSchema,
Table = tableRef.Table
};
var descriptor = FlightDescriptor.CreateCommandDescriptor(commandGetExportedKeysSchema.PackAndSerialize());
var schemaResult = await GetSchemaAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return schemaResult;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get exported keys schema", ex);
}
}
/// <summary>
/// Retrieves the foreign key columns for the given table.
/// </summary>
/// <param name="tableRef">The table reference.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> GetImportedKeysAsync(
TableRef tableRef,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
if (tableRef == null)
throw new ArgumentNullException(nameof(tableRef));
try
{
var getImportedKeysRequest = new CommandGetImportedKeys
{
Catalog = tableRef.Catalog ?? string.Empty,
DbSchema = tableRef.DbSchema,
Table = tableRef.Table
};
var descriptor = FlightDescriptor.CreateCommandDescriptor(getImportedKeysRequest.PackAndSerialize());
var flightInfo = await GetFlightInfoAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return flightInfo;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get imported keys", ex);
}
}
/// <summary>
/// Get the imported keys schema from the server.
/// </summary>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The SchemaResult describing the schema of the imported keys.</returns>
public async Task<Schema> GetImportedKeysSchemaAsync(FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
try
{
var commandGetImportedKeysSchema = new CommandGetImportedKeys();
var descriptor = FlightDescriptor.CreateCommandDescriptor(commandGetImportedKeysSchema.PackAndSerialize());
var schemaResult = await GetSchemaAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return schemaResult;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get imported keys schema", ex);
}
}
/// <summary>
/// Retrieves a description of the foreign key columns in the given foreign key table that reference the primary key or the columns representing a unique constraint of the parent table.
/// </summary>
/// <param name="pkTableRef">The table reference that exports the key.</param>
/// <param name="fkTableRef">The table reference that imports the key.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> GetCrossReferenceAsync(
TableRef pkTableRef,
TableRef fkTableRef,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
if (pkTableRef == null)
throw new ArgumentNullException(nameof(pkTableRef));
if (fkTableRef == null)
throw new ArgumentNullException(nameof(fkTableRef));
try
{
var commandGetCrossReference = new CommandGetCrossReference
{
PkCatalog = pkTableRef.Catalog ?? string.Empty,
PkDbSchema = pkTableRef.DbSchema,
PkTable = pkTableRef.Table,
FkCatalog = fkTableRef.Catalog ?? string.Empty,
FkDbSchema = fkTableRef.DbSchema,
FkTable = fkTableRef.Table
};
var descriptor = FlightDescriptor.CreateCommandDescriptor(commandGetCrossReference.PackAndSerialize());
var flightInfo = await GetFlightInfoAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return flightInfo;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get cross reference", ex);
}
}
/// <summary>
/// Get the cross-reference schema from the server.
/// </summary>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The SchemaResult describing the schema of the cross-reference.</returns>
public async Task<Schema> GetCrossReferenceSchemaAsync(FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
try
{
var commandGetCrossReferenceSchema = new CommandGetCrossReference();
var descriptor =
FlightDescriptor.CreateCommandDescriptor(commandGetCrossReferenceSchema.PackAndSerialize());
var schemaResultCall = GetSchemaAsync(descriptor, options, cancellationToken);
var schemaResult = await schemaResultCall.ConfigureAwait(false);
return schemaResult;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get cross-reference schema", ex);
}
}
/// <summary>
/// Request a list of table types.
/// </summary>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> GetTableTypesAsync(FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
try
{
var command = new CommandGetTableTypes();
var descriptor = FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize());
var flightInfo = await GetFlightInfoAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return flightInfo;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get table types", ex);
}
}
/// <summary>
/// Get the table types schema from the server.
/// </summary>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The SchemaResult describing the schema of the table types.</returns>
public async Task<Schema> GetTableTypesSchemaAsync(FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
try
{
var command = new CommandGetTableTypes();
var descriptor = FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize());
var schemaResult = await GetSchemaAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return schemaResult;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get table types schema", ex);
}
}
/// <summary>
/// Request the information about all the data types supported with filtering by data type.
/// </summary>
/// <param name="dataType">The data type to search for as filtering.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> GetXdbcTypeInfoAsync(int dataType, FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
try
{
var command = new CommandGetXdbcTypeInfo { DataType = dataType };
var descriptor = FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize());
var flightInfo = await GetFlightInfoAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return flightInfo;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get XDBC type info", ex);
}
}
/// <summary>
/// Request the information about all the data types supported.
/// </summary>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> GetXdbcTypeInfoAsync(FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
try
{
var command = new CommandGetXdbcTypeInfo();
var descriptor = FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize());
var flightInfo = await GetFlightInfoAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return flightInfo;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get XDBC type info", ex);
}
}
/// <summary>
/// Get the type info schema from the server.
/// </summary>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The SchemaResult describing the schema of the type info.</returns>
public async Task<Schema> GetXdbcTypeInfoSchemaAsync(FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
try
{
var command = new CommandGetXdbcTypeInfo();
var descriptor = FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize());
var schemaResult = await GetSchemaAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return schemaResult;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get XDBC type info schema", ex);
}
}
/// <summary>
/// Request a list of SQL information.
/// </summary>
/// <param name="sqlInfo">The SQL info required.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> GetSqlInfoAsync(
List<int>? sqlInfo = default,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
sqlInfo ??= new List<int>();
try
{
var command = new CommandGetSqlInfo();
command.Info.AddRange(sqlInfo.ConvertAll(item => (uint)item));
var descriptor = FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize());
var flightInfo = await GetFlightInfoAsync(descriptor, options, cancellationToken).ConfigureAwait(false);
return flightInfo;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get SQL info", ex);
}
}
/// <summary>
/// Get the SQL information schema from the server.
/// </summary>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The SchemaResult describing the schema of the SQL information.</returns>
public async Task<Schema> GetSqlInfoSchemaAsync(FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
try
{
var command = new CommandGetSqlInfo();
var descriptor = FlightDescriptor.CreateCommandDescriptor(command.PackAndSerialize());
var schemaResultCall = _client.GetSchema(descriptor, options?.Headers, null, cancellationToken);
var schemaResult = await schemaResultCall.ResponseAsync.ConfigureAwait(false);
return schemaResult;
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to get SQL info schema", ex);
}
}
/// <summary>
/// Explicitly cancel a FlightInfo.
/// </summary>
/// <param name="request">The CancelFlightInfoRequest.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>A Task representing the asynchronous operation. The task result contains the CancelFlightInfoResult describing the canceled result.</returns>
public async Task<FlightInfoCancelResult> CancelFlightInfoAsync(
FlightInfoCancelRequest request,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
if (request == null) throw new ArgumentNullException(nameof(request));
try
{
var action = new FlightAction(SqlAction.CancelFlightInfoRequest, request.PackAndSerialize());
var call = _client.DoAction(action, options?.Headers, null, cancellationToken);
await foreach (var result in call.ResponseStream.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
if (Any.Parser.ParseFrom(result.Body) is Any anyResult &&
anyResult.TryUnpack(out FlightInfoCancelResult cancelResult))
{
return cancelResult;
}
}
throw new InvalidOperationException("No response received for the CancelFlightInfo request.");
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to cancel flight info", ex);
}
}
/// <summary>
/// Explicitly cancel a query.
/// </summary>
/// <param name="info">The FlightInfo of the query to cancel.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
public async Task<FlightInfoCancelResult> CancelQueryAsync(
FlightInfo info,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
if (info == null)
throw new ArgumentNullException(nameof(info));
try
{
var cancelQueryRequest = new FlightInfoCancelRequest(info);
var cancelQueryAction =
new FlightAction(SqlAction.CancelFlightInfoRequest, cancelQueryRequest.PackAndSerialize());
var cancelQueryCall = _client.DoAction(cancelQueryAction, options?.Headers, null, cancellationToken);
await foreach (var result in cancelQueryCall.ResponseStream.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
if (Any.Parser.ParseFrom(result.Body) is Any anyResult &&
anyResult.TryUnpack(out FlightInfoCancelResult cancelResult))
{
return cancelResult;
}
}
throw new InvalidOperationException("Failed to cancel query: No response received.");
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to cancel query", ex);
}
}
/// <summary>
/// Begin a new transaction.
/// </summary>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>A Task representing the asynchronous operation. The task result contains the Transaction object representing the new transaction.</returns>
public async Task<Transaction> BeginTransactionAsync(FlightCallOptions? options = default, CancellationToken cancellationToken = default)
{
try
{
var actionBeginTransaction = new ActionBeginTransactionRequest();
var action = new FlightAction(SqlAction.BeginTransactionRequest, actionBeginTransaction.PackAndSerialize());
var responseStream = _client.DoAction(action, options?.Headers, null, cancellationToken);
await foreach (var result in responseStream.ResponseStream.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
string? beginTransactionResult = result.Body.ToStringUtf8();
return new Transaction(beginTransactionResult);
}
throw new InvalidOperationException("Failed to begin transaction: No response received.");
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to begin transaction", ex);
}
}
/// <summary>
/// Commit a transaction.
/// After this, the transaction and all associated savepoints will be invalidated.
/// </summary>
/// <param name="transaction">The transaction.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
public AsyncServerStreamingCall<FlightResult> CommitAsync(
Transaction transaction,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
if (transaction == null)
throw new ArgumentNullException(nameof(transaction));
try
{
var actionCommit = new FlightAction(SqlAction.CommitRequest, transaction.TransactionId);
return _client.DoAction(actionCommit, options?.Headers, null, cancellationToken);
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to commit transaction", ex);
}
}
/// <summary>
/// Rollback a transaction.
/// After this, the transaction and all associated savepoints will be invalidated.
/// </summary>
/// <param name="transaction">The transaction to rollback.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
public AsyncServerStreamingCall<FlightResult> RollbackAsync(
Transaction transaction,
FlightCallOptions? options = default,
CancellationToken cancellationToken = default)
{
if (transaction == null)
{
throw new ArgumentNullException(nameof(transaction));
}
try
{
var actionRollback = new FlightAction(SqlAction.RollbackRequest, transaction.TransactionId);
return _client.DoAction(actionRollback, options?.Headers, null, cancellationToken);
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to rollback transaction", ex);
}
}
/// <summary>
/// Create a prepared statement object.
/// </summary>
/// <param name="query">The query that will be executed.</param>
/// <param name="transaction">A transaction to associate this query with.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The created prepared statement.</returns>
public async Task<PreparedStatement> PrepareAsync(
string query,
Transaction transaction = default,
FlightCallOptions? options = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(query))
throw new ArgumentException("Query cannot be null or empty", nameof(query));
if (transaction == default)
{
transaction = Transaction.NoTransaction;
}
try
{
var command = new ActionCreatePreparedStatementRequest
{
Query = query
};
if (transaction.IsValid)
{
command.TransactionId = transaction.TransactionId;
}
var action = new FlightAction(SqlAction.CreateRequest, command.PackAndSerialize());
var call = _client.DoAction(action, options?.Headers);
var preparedStatementResponse = await ReadPreparedStatementAsync(call, cancellationToken).ConfigureAwait(false);
return new PreparedStatement(this,
preparedStatementResponse.PreparedStatementHandle.ToStringUtf8(),
SchemaExtensions.DeserializeSchema(preparedStatementResponse.DatasetSchema.ToByteArray()),
SchemaExtensions.DeserializeSchema(preparedStatementResponse.ParameterSchema.ToByteArray())
);
}
catch (RpcException ex)
{
throw new InvalidOperationException("Failed to prepare statement", ex);
}
}
private static async Task<ActionCreatePreparedStatementResult> ReadPreparedStatementAsync(
AsyncServerStreamingCall<FlightResult> call,
CancellationToken cancellationToken)
{
await foreach (var result in call.ResponseStream.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
var response = Any.Parser.ParseFrom(result.Body);
if (response.Is(ActionCreatePreparedStatementResult.Descriptor))
{
return response.Unpack<ActionCreatePreparedStatementResult>();
}
}
throw new InvalidOperationException("Server did not return a valid prepared statement response.");
}
}