blob: 17a5e6f84c64f5fabe957f1eba7b62176d731eb5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Apache.Arrow.Flight.Client;
using Apache.Arrow.Flight.Sql.Client;
using Apache.Arrow.Flight.TestWeb;
using Apache.Arrow.Types;
using Arrow.Flight.Protocol.Sql;
using Google.Protobuf;
using Grpc.Core.Utils;
using Xunit;
namespace Apache.Arrow.Flight.Sql.Tests;
public class FlightSqlClientTests : IDisposable
{
readonly TestFlightSqlWebFactory _testWebFactory;
readonly FlightStore _flightStore;
private readonly FlightSqlClient _flightSqlClient;
private readonly FlightSqlTestUtils _testUtils;
public FlightSqlClientTests()
{
_flightStore = new FlightStore();
_testWebFactory = new TestFlightSqlWebFactory(_flightStore);
FlightClient flightClient = new(_testWebFactory.GetChannel());
_flightSqlClient = new FlightSqlClient(flightClient);
_testUtils = new FlightSqlTestUtils(_testWebFactory, _flightStore);
}
#region Transactions
[Fact]
public async Task CommitTransactionAsync()
{
// Arrange
string transactionId = "sample-transaction-id";
var transaction = new Transaction(transactionId);
// Act
var streamCall = _flightSqlClient.CommitAsync(transaction);
var result = await streamCall.ResponseStream.ToListAsync();
// Assert
Assert.NotNull(result);
Assert.Equal(transaction.TransactionId, result.FirstOrDefault()?.Body);
}
[Fact]
public async Task BeginTransactionAsync()
{
// Arrange
string expectedTransactionId = "sample-transaction-id";
// Act
var transaction = await _flightSqlClient.BeginTransactionAsync();
// Assert
Assert.NotEqual(Transaction.NoTransaction, transaction);
Assert.Equal(ByteString.CopyFromUtf8(expectedTransactionId), transaction.TransactionId);
}
[Fact]
public async Task RollbackTransactionAsync()
{
// Arrange
string transactionId = "sample-transaction-id";
var transaction = new Transaction(transactionId);
// Act
var streamCall = _flightSqlClient.RollbackAsync(transaction);
var result = await streamCall.ResponseStream.ToListAsync();
// Assert
Assert.Equal(result.FirstOrDefault()?.Body, transaction.TransactionId);
}
#endregion
#region PreparedStatement
[Theory]
[InlineData("sample-transaction-id", true)]
[InlineData(null, false)]
public async Task PreparedAsync(string transactionId, bool expectTransaction)
{
// Arrange
string query = "INSERT INTO users (id, name) VALUES (1, 'John Doe')";
var transaction = string.IsNullOrEmpty(transactionId)
? Transaction.NoTransaction
: new Transaction(transactionId);
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
// Create a sample schema for the dataset and parameters
var schema = new Schema.Builder()
.Field(f => f.Name("id").DataType(Int32Type.Default))
.Field(f => f.Name("name").DataType(StringType.Default))
.Build();
var recordBatch = new RecordBatch(schema, new Array[]
{
new Int32Array.Builder().Append(1).Build(),
new StringArray.Builder().Append("John Doe").Build()
}, 1);
var flightHolder = new FlightHolder(flightDescriptor, schema, _testWebFactory.GetAddress());
flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch));
_flightStore.Flights.Add(flightDescriptor, flightHolder);
var datasetSchemaBytes = SchemaExtensions.SerializeSchema(schema);
var parameterSchemaBytes = SchemaExtensions.SerializeSchema(schema);
var preparedStatementResponse = new ActionCreatePreparedStatementResult
{
PreparedStatementHandle = ByteString.CopyFromUtf8("prepared-handle"),
DatasetSchema = ByteString.CopyFrom(datasetSchemaBytes),
ParameterSchema = ByteString.CopyFrom(parameterSchemaBytes)
};
// Act
PreparedStatement preparedStatement;
if (expectTransaction)
{
preparedStatement = await _flightSqlClient.PrepareAsync(query, transaction);
}
else
{
preparedStatement = await _flightSqlClient.PrepareAsync(query);
}
var deserializedDatasetSchema =
SchemaExtensions.DeserializeSchema(preparedStatementResponse.DatasetSchema.ToByteArray());
var deserializedParameterSchema =
SchemaExtensions.DeserializeSchema(preparedStatementResponse.ParameterSchema.ToByteArray());
// Assert
Assert.NotNull(preparedStatement);
Assert.NotNull(deserializedDatasetSchema);
Assert.NotNull(deserializedParameterSchema);
CompareSchemas(schema, deserializedDatasetSchema);
CompareSchemas(schema, deserializedParameterSchema);
}
#endregion
[Fact]
public async Task ExecuteUpdateAsync()
{
// Arrange
string query = "UPDATE test_table SET column1 = 'value'";
var transaction = new Transaction("sample-transaction-id");
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var schema = new Schema.Builder()
.Field(f => f.Name("id").DataType(Int32Type.Default))
.Field(f => f.Name("name").DataType(StringType.Default))
.Build();
var recordBatch = new RecordBatch(schema, new IArrowArray[]
{
new Int32Array.Builder().AppendRange([1, 2, 3, 4, 5]).Build(),
new StringArray.Builder().AppendRange(["John Doe", "Jane Doe", "Alice", "Bob", "Charlie"]).Build()
}, 5);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, _testWebFactory.GetAddress());
flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch));
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
long affectedRows = await _flightSqlClient.ExecuteUpdateAsync(query, transaction);
// Assert
Assert.Equal(5, affectedRows);
}
[Fact]
public async Task ExecuteAsync()
{
// Arrange
string query = "SELECT * FROM test_table";
var transaction = new Transaction("sample-transaction-id");
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, _testWebFactory.GetAddress());
flightHolder.AddBatch(new RecordBatchWithMetadata(recordBatch));
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var flightInfo = await _flightSqlClient.ExecuteAsync(query, transaction);
// Assert
Assert.NotNull(flightInfo);
Assert.Single(flightInfo.Endpoints);
}
[Fact]
public async Task ExecuteAsync_ShouldReturnFlightInfo_WhenValidInputsAreProvided()
{
// Arrange
string query = "SELECT * FROM test_table";
var transaction = new Transaction("sample-transaction-id");
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var flightInfo = await _flightSqlClient.ExecuteAsync(query, transaction);
// Assert
Assert.NotNull(flightInfo);
Assert.IsType<FlightInfo>(flightInfo);
}
[Fact]
public async Task ExecuteAsync_ShouldThrowArgumentException_WhenQueryIsEmpty()
{
// Arrange
string emptyQuery = string.Empty;
var transaction = new Transaction("sample-transaction-id");
// Act & Assert
await Assert.ThrowsAsync<ArgumentException>(async () =>
await _flightSqlClient.ExecuteAsync(emptyQuery, transaction));
}
[Fact]
public async Task ExecuteAsync_ShouldReturnFlightInfo_WhenTransactionIsNoTransaction()
{
// Arrange
string query = "SELECT * FROM test_table";
var transaction = Transaction.NoTransaction;
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var flightInfo = await _flightSqlClient.ExecuteAsync(query, transaction);
// Assert
Assert.NotNull(flightInfo);
Assert.IsType<FlightInfo>(flightInfo);
}
[Fact]
public async Task GetFlightInfoAsync()
{
// Arrange
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var flightInfo = await _flightSqlClient.GetFlightInfoAsync(flightDescriptor);
// Assert
Assert.NotNull(flightInfo);
}
[Fact]
public async Task GetExecuteSchemaAsync()
{
// Arrange
string query = "SELECT * FROM test_table";
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
Schema resultSchema =
await _flightSqlClient.GetExecuteSchemaAsync(query, new Transaction("sample-transaction-id"));
// Assert
Assert.NotNull(resultSchema);
Assert.Equal(recordBatch.Schema.FieldsList.Count, resultSchema.FieldsList.Count);
CompareSchemas(resultSchema, recordBatch.Schema);
}
[Fact]
public async Task GetCatalogsAsync()
{
// Arrange
var options = new FlightCallOptions();
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var result = await _flightSqlClient.GetCatalogsAsync(options);
// Assert
Assert.NotNull(result);
Assert.Equal(flightHolder.GetFlightInfo().Endpoints.Count, result.Endpoints.Count);
Assert.Equal(flightDescriptor, result.Descriptor);
}
[Fact]
public async Task GetSchemaAsync()
{
// Arrange
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var result = await _flightSqlClient.GetSchemaAsync(flightDescriptor);
// Assert
Assert.NotNull(result);
Assert.Equal(recordBatch.Schema.FieldsList.Count, result.FieldsList.Count);
CompareSchemas(result, recordBatch.Schema);
}
[Fact]
public async Task GetDbSchemasAsync()
{
// Arrange
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
string catalog = "test-catalog";
string dbSchemaFilterPattern = "test-schema-pattern";
// Act
var result = await _flightSqlClient.GetDbSchemasAsync(catalog, dbSchemaFilterPattern);
// Assert
Assert.NotNull(result);
var expectedFlightInfo = flightHolder.GetFlightInfo();
Assert.Equal(recordBatch.Schema.FieldsList.Count, result.Schema.FieldsList.Count);
Assert.Equal(expectedFlightInfo.Descriptor.Command, result.Descriptor.Command);
Assert.Equal(expectedFlightInfo.Descriptor.Type, result.Descriptor.Type);
Assert.Equal(expectedFlightInfo.Schema.FieldsList.Count, result.Schema.FieldsList.Count);
Assert.Equal(expectedFlightInfo.Endpoints.Count, result.Endpoints.Count);
for (int i = 0; i < expectedFlightInfo.Schema.FieldsList.Count; i++)
{
var expectedField = expectedFlightInfo.Schema.FieldsList[i];
var actualField = result.Schema.FieldsList[i];
Assert.Equal(expectedField.Name, actualField.Name);
Assert.Equal(expectedField.DataType, actualField.DataType);
Assert.Equal(expectedField.IsNullable, actualField.IsNullable);
Assert.Equal(expectedField.Metadata?.Count ?? 0, actualField.Metadata?.Count ?? 0);
}
for (int i = 0; i < expectedFlightInfo.Endpoints.Count; i++)
{
var expectedEndpoint = expectedFlightInfo.Endpoints[i];
var actualEndpoint = result.Endpoints[i];
Assert.Equal(expectedEndpoint.Ticket, actualEndpoint.Ticket);
Assert.Equal(expectedEndpoint.Locations.Count(), actualEndpoint.Locations.Count());
}
}
[Fact]
public async Task GetPrimaryKeysAsync()
{
// Arrange
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var tableRef = new TableRef("test-catalog", "test-schema", "test-table");
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var result = await _flightSqlClient.GetPrimaryKeysAsync(tableRef);
// Assert
Assert.NotNull(result);
var expectedFlightInfo = flightHolder.GetFlightInfo();
Assert.Equal(expectedFlightInfo.Descriptor.Command, result.Descriptor.Command);
Assert.Equal(expectedFlightInfo.Descriptor.Type, result.Descriptor.Type);
Assert.Equal(expectedFlightInfo.Schema.FieldsList.Count, result.Schema.FieldsList.Count);
for (int i = 0; i < expectedFlightInfo.Schema.FieldsList.Count; i++)
{
var expectedField = expectedFlightInfo.Schema.FieldsList[i];
var actualField = result.Schema.FieldsList[i];
Assert.Equal(expectedField.Name, actualField.Name);
Assert.Equal(expectedField.DataType, actualField.DataType);
Assert.Equal(expectedField.IsNullable, actualField.IsNullable);
Assert.Equal(expectedField.Metadata?.Count ?? 0, actualField.Metadata?.Count ?? 0);
}
Assert.Equal(expectedFlightInfo.Endpoints.Count, result.Endpoints.Count);
for (int i = 0; i < expectedFlightInfo.Endpoints.Count; i++)
{
var expectedEndpoint = expectedFlightInfo.Endpoints[i];
var actualEndpoint = result.Endpoints[i];
Assert.Equal(expectedEndpoint.Ticket, actualEndpoint.Ticket);
Assert.Equal(expectedEndpoint.Locations.Count(), actualEndpoint.Locations.Count());
}
}
[Fact]
public async Task GetTablesAsync()
{
// Arrange
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
string catalog = "sample_catalog";
string dbSchemaFilterPattern = "sample_schema";
string tableFilterPattern = "sample_table";
bool includeSchema = true;
var tableTypes = new List<string> { "BASE TABLE" };
// Act
var result = await _flightSqlClient.GetTablesAsync(catalog, dbSchemaFilterPattern, tableFilterPattern,
includeSchema, tableTypes);
// Assert
Assert.NotNull(result);
Assert.Single(result);
var expectedFlightInfo = flightHolder.GetFlightInfo();
var flightInfo = result.First();
Assert.Equal(expectedFlightInfo.Descriptor.Command, flightInfo.Descriptor.Command);
Assert.Equal(expectedFlightInfo.Descriptor.Type, flightInfo.Descriptor.Type);
Assert.Equal(expectedFlightInfo.Schema.FieldsList.Count, flightInfo.Schema.FieldsList.Count);
for (int i = 0; i < expectedFlightInfo.Schema.FieldsList.Count; i++)
{
var expectedField = expectedFlightInfo.Schema.FieldsList[i];
var actualField = flightInfo.Schema.FieldsList[i];
Assert.Equal(expectedField.Name, actualField.Name);
Assert.Equal(expectedField.DataType, actualField.DataType);
Assert.Equal(expectedField.IsNullable, actualField.IsNullable);
}
Assert.Equal(expectedFlightInfo.Endpoints.Count, flightInfo.Endpoints.Count);
}
[Fact]
public async Task GetCatalogsSchemaAsync()
{
// Arrange
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var schema = await _flightSqlClient.GetCatalogsSchemaAsync();
// Assert
Assert.NotNull(schema);
var expectedFlightInfo = flightHolder.GetFlightInfo();
for (int i = 0; i < expectedFlightInfo.Schema.FieldsList.Count; i++)
{
var expectedField = expectedFlightInfo.Schema.FieldsList[i];
var actualField = schema.FieldsList[i];
Assert.Equal(expectedField.Name, actualField.Name);
Assert.Equal(expectedField.DataType, actualField.DataType);
Assert.Equal(expectedField.IsNullable, actualField.IsNullable);
}
}
[Fact]
public async Task GetDbSchemasSchemaAsync()
{
// Arrange
var options = new FlightCallOptions();
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var schema = await _flightSqlClient.GetDbSchemasSchemaAsync(options);
// Assert
Assert.NotNull(schema);
for (int i = 0; i < schema.FieldsList.Count; i++)
{
var expectedField = schema.FieldsList[i];
var actualField = schema.FieldsList[i];
Assert.Equal(expectedField.Name, actualField.Name);
Assert.Equal(expectedField.DataType, actualField.DataType);
Assert.Equal(expectedField.IsNullable, actualField.IsNullable);
}
}
[Fact]
public async Task DoPutAsync()
{
// Arrange
var schema = new Schema.Builder()
.Field(f => f.Name("DATA_TYPE_ID").DataType(Int32Type.Default).Nullable(false))
.Field(f => f.Name("TYPE_NAME").DataType(StringType.Default).Nullable(false))
.Field(f => f.Name("PRECISION").DataType(Int32Type.Default).Nullable(false))
.Field(f => f.Name("LITERAL_PREFIX").DataType(StringType.Default).Nullable(false))
.Field(f => f.Name("COLUMN_SIZE").DataType(Int32Type.Default).Nullable(false))
.Build();
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
int[] dataTypeIds = { 1, 2, 3 };
string[] typeNames = ["INTEGER", "VARCHAR", "BOOLEAN"];
int[] precisions = { 32, 255, 1 };
string[] literalPrefixes = ["N'", "'", "b'"];
int[] columnSizes = [10, 255, 1];
var recordBatch = new RecordBatch(schema,
[
new Int32Array.Builder().AppendRange(dataTypeIds).Build(),
new StringArray.Builder().AppendRange(typeNames).Build(),
new Int32Array.Builder().AppendRange(precisions).Build(),
new StringArray.Builder().AppendRange(literalPrefixes).Build(),
new Int32Array.Builder().AppendRange(columnSizes).Build()
], 5);
Assert.NotNull(recordBatch);
Assert.Equal(5, recordBatch.Length);
var flightHolder = new FlightHolder(flightDescriptor, schema, _testWebFactory.GetAddress());
flightHolder.AddBatch(new RecordBatchWithMetadata(_testUtils.CreateTestBatch(0, 100)));
_flightStore.Flights.Add(flightDescriptor, flightHolder);
var expectedBatch = _testUtils.CreateTestBatch(0, 100);
// Act
var result = await _flightSqlClient.DoPutAsync(flightDescriptor, expectedBatch);
// Assert
Assert.NotNull(result);
}
[Fact]
public async Task GetExportedKeysAsync()
{
// Arrange
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var tableRef = new TableRef("test-catalog", "test-schema", "test-table");
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var flightInfo = await _flightSqlClient.GetExportedKeysAsync(tableRef);
// Assert
Assert.NotNull(flightInfo);
Assert.Equal("test", flightInfo.Descriptor.Command.ToStringUtf8());
}
[Fact]
public async Task GetExportedKeysSchemaAsync()
{
// Arrange
var options = new FlightCallOptions();
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var tableRef = new TableRef("test-catalog", "test-schema", "test-table");
var schema = await _flightSqlClient.GetExportedKeysSchemaAsync(tableRef);
// Assert
Assert.NotNull(schema);
Assert.True(schema.FieldsList.Count > 0, "Schema should contain fields.");
Assert.Equal("test", schema.FieldsList.First().Name);
}
[Fact]
public async Task GetImportedKeysAsync()
{
// Arrange
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var flightInfo =
await _flightSqlClient.GetImportedKeysAsync(new TableRef("test-catalog", "test-schema", "test-table"));
// Assert
Assert.NotNull(flightInfo);
for (int i = 0; i < recordBatch.Schema.FieldsList.Count; i++)
{
var expectedField = recordBatch.Schema.FieldsList[i];
var actualField = flightInfo.Schema.FieldsList[i];
Assert.Equal(expectedField.Name, actualField.Name);
Assert.Equal(expectedField.DataType, actualField.DataType);
Assert.Equal(expectedField.IsNullable, actualField.IsNullable);
}
}
[Fact]
public async Task GetImportedKeysSchemaAsync()
{
// Arrange
var options = new FlightCallOptions();
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var schema = await _flightSqlClient.GetImportedKeysSchemaAsync(options);
// Assert
var expectedSchema = recordBatch.Schema;
Assert.NotNull(schema);
Assert.Equal(expectedSchema.FieldsList.Count, schema.FieldsList.Count);
CompareSchemas(expectedSchema, schema);
}
[Fact]
public async Task GetCrossReferenceAsync()
{
// Arrange
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema, _testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
var pkTableRef = new TableRef("PKCatalog", "PKSchema", "PKTable");
var fkTableRef = new TableRef("FKCatalog", "FKSchema", "FKTable");
// Act
var flightInfo = await _flightSqlClient.GetCrossReferenceAsync(pkTableRef, fkTableRef);
// Assert
Assert.NotNull(flightInfo);
Assert.Equal(flightDescriptor, flightInfo.Descriptor);
Assert.Single(flightInfo.Schema.FieldsList);
}
[Fact]
public async Task GetCrossReferenceSchemaAsync()
{
// Arrange
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var recordBatch = _testUtils.CreateTestBatch(0, 100);
var flightHolder = new FlightHolder(flightDescriptor, recordBatch.Schema,
_testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var schema = await _flightSqlClient.GetCrossReferenceSchemaAsync();
// Assert
var expectedSchema = recordBatch.Schema;
Assert.NotNull(schema);
Assert.Equal(expectedSchema.FieldsList.Count, schema.FieldsList.Count);
}
[Fact]
public async Task GetTableTypesAsync()
{
// Arrange
var expectedSchema = new Schema.Builder()
.Field(f => f.Name("DATA_TYPE_ID").DataType(Int32Type.Default).Nullable(false))
.Build();
var commandGetTableTypes = new CommandGetTableTypes();
byte[] packedCommand = commandGetTableTypes.PackAndSerialize().ToByteArray();
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor(packedCommand);
var flightHolder = new FlightHolder(flightDescriptor, expectedSchema, "http://localhost:5000");
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var flightInfo = await _flightSqlClient.GetTableTypesAsync();
var actualSchema = flightInfo.Schema;
// Assert
Assert.NotNull(flightInfo);
CompareSchemas(expectedSchema, actualSchema);
}
[Fact]
public async Task GetTableTypesSchemaAsync()
{
// Arrange
var options = new FlightCallOptions();
var expectedSchema = new Schema.Builder()
.Field(f => f.Name("DATA_TYPE_ID").DataType(Int32Type.Default).Nullable(false))
.Build();
var commandGetTableTypesSchema = new CommandGetTableTypes();
byte[] packedCommand = commandGetTableTypesSchema.PackAndSerialize().ToByteArray();
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor(packedCommand);
var flightHolder = new FlightHolder(flightDescriptor, expectedSchema, "http://localhost:5000");
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var schemaResult = await _flightSqlClient.GetTableTypesSchemaAsync(options);
// Assert
Assert.NotNull(schemaResult);
CompareSchemas(expectedSchema, schemaResult);
}
[Fact]
public async Task GetXdbcTypeInfoAsync()
{
// Arrange
var options = new FlightCallOptions();
var expectedSchema = new Schema.Builder()
.Field(f => f.Name("DATA_TYPE_ID").DataType(Int32Type.Default).Nullable(false))
.Field(f => f.Name("TYPE_NAME").DataType(StringType.Default).Nullable(false))
.Field(f => f.Name("PRECISION").DataType(Int32Type.Default).Nullable(false))
.Field(f => f.Name("LITERAL_PREFIX").DataType(StringType.Default).Nullable(false))
.Field(f => f.Name("COLUMN_SIZE").DataType(Int32Type.Default).Nullable(false))
.Build();
var commandGetXdbcTypeInfo = new CommandGetXdbcTypeInfo();
byte[] packedCommand = commandGetXdbcTypeInfo.PackAndSerialize().ToByteArray();
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor(packedCommand);
// Creating a flight holder with the expected schema and adding it to the flight store
var flightHolder = new FlightHolder(flightDescriptor, expectedSchema, "http://localhost:5000");
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var flightInfo = await _flightSqlClient.GetXdbcTypeInfoAsync(options);
// Assert
Assert.NotNull(flightInfo);
CompareSchemas(expectedSchema, flightInfo.Schema);
}
[Fact]
public async Task GetXdbcTypeInfoSchemaAsync()
{
// Arrange
var options = new FlightCallOptions();
var expectedSchema = new Schema.Builder()
.Field(f => f.Name("DATA_TYPE_ID").DataType(Int32Type.Default).Nullable(false))
.Field(f => f.Name("TYPE_NAME").DataType(StringType.Default).Nullable(false))
.Field(f => f.Name("PRECISION").DataType(Int32Type.Default).Nullable(false))
.Field(f => f.Name("LITERAL_PREFIX").DataType(StringType.Default).Nullable(false))
.Build();
var commandGetXdbcTypeInfo = new CommandGetXdbcTypeInfo();
byte[] packedCommand = commandGetXdbcTypeInfo.PackAndSerialize().ToByteArray();
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor(packedCommand);
var flightHolder = new FlightHolder(flightDescriptor, expectedSchema, "http://localhost:5000");
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var schema = await _flightSqlClient.GetXdbcTypeInfoSchemaAsync(options);
// Assert
Assert.NotNull(schema);
CompareSchemas(expectedSchema, schema);
}
[Fact]
public async Task GetSqlInfoSchemaAsync()
{
// Arrange
var options = new FlightCallOptions();
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("sqlInfo");
var expectedSchema = new Schema.Builder()
.Field(f => f.Name("DATA_TYPE_ID").DataType(Int32Type.Default).Nullable(false))
.Build();
var flightHolder = new FlightHolder(flightDescriptor, expectedSchema, _testWebFactory.GetAddress());
_flightStore.Flights.Add(flightDescriptor, flightHolder);
// Act
var schema = await _flightSqlClient.GetSqlInfoSchemaAsync(options);
// Assert
Assert.NotNull(schema);
CompareSchemas(expectedSchema, schema);
}
[Fact]
public async Task CancelFlightInfoAsync()
{
// Arrange
var schema = new Schema.Builder()
.Field(f => f.Name("DATA_TYPE_ID").DataType(Int32Type.Default).Nullable(false))
.Build();
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var flightInfo = new FlightInfo(schema, flightDescriptor, new List<FlightEndpoint>(), 0, 0);
var cancelRequest = new FlightInfoCancelRequest(flightInfo);
// Act
var cancelResult = await _flightSqlClient.CancelFlightInfoAsync(cancelRequest);
// Assert
Assert.Equal(1, cancelResult.GetCancelStatus());
}
[Fact]
public async Task CancelQueryAsync()
{
// Arrange
var flightDescriptor = FlightDescriptor.CreateCommandDescriptor("test");
var schema = new Schema.Builder()
.Field(f => f.Name("DATA_TYPE_ID").DataType(Int32Type.Default).Nullable(false))
.Build();
var flightInfo = new FlightInfo(schema, flightDescriptor, new List<FlightEndpoint>(), 0, 0);
// Adding the flight info to the flight store for testing
_flightStore.Flights.Add(flightDescriptor,
new FlightHolder(flightDescriptor, schema, _testWebFactory.GetAddress()));
// Act
var cancelStatus = await _flightSqlClient.CancelQueryAsync(flightInfo);
// Assert
Assert.Equal(1, cancelStatus.GetCancelStatus());
}
public void Dispose() => _testWebFactory?.Dispose();
private void CompareSchemas(Schema expectedSchema, Schema actualSchema)
{
Assert.Equal(expectedSchema.FieldsList.Count, actualSchema.FieldsList.Count);
for (int i = 0; i < expectedSchema.FieldsList.Count; i++)
{
var expectedField = expectedSchema.FieldsList[i];
var actualField = actualSchema.FieldsList[i];
Assert.Equal(expectedField.Name, actualField.Name);
Assert.Equal(expectedField.DataType, actualField.DataType);
Assert.Equal(expectedField.IsNullable, actualField.IsNullable);
Assert.Equal(expectedField.Metadata, actualField.Metadata);
}
}
}