blob: d0942669f3f60f1148665c9d89ff4ced6be3c9d3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Flight.Internal;
using Apache.Arrow.Flight.Protocol;
using Apache.Arrow.Flight.Server.Internal;
using Grpc.Core;
namespace Apache.Arrow.Flight.Client
{
public class FlightClient
{
internal static readonly Empty EmptyInstance = new Empty();
private readonly FlightService.FlightServiceClient _client;
public FlightClient(ChannelBase grpcChannel)
{
_client = new FlightService.FlightServiceClient(grpcChannel);
}
public FlightClient(CallInvoker callInvoker)
{
_client = new FlightService.FlightServiceClient(callInvoker);
}
public AsyncServerStreamingCall<FlightInfo> ListFlights(FlightCriteria criteria = null, Metadata headers = null)
{
return ListFlights(criteria, headers, null, CancellationToken.None);
}
public AsyncServerStreamingCall<FlightInfo> ListFlights(FlightCriteria criteria, Metadata headers, System.DateTime? deadline, CancellationToken cancellationToken = default)
{
if (criteria == null)
{
criteria = FlightCriteria.Empty;
}
var response = _client.ListFlights(criteria.ToProtocol(), headers, deadline, cancellationToken);
var convertStream = new StreamReader<Protocol.FlightInfo, FlightInfo>(response.ResponseStream, inFlight => new FlightInfo(inFlight));
return new AsyncServerStreamingCall<FlightInfo>(convertStream, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose);
}
public AsyncServerStreamingCall<FlightActionType> ListActions(Metadata headers = null)
{
return ListActions(headers, null, CancellationToken.None);
}
public AsyncServerStreamingCall<FlightActionType> ListActions(Metadata headers, System.DateTime? deadline, CancellationToken cancellationToken = default)
{
var response = _client.ListActions(EmptyInstance, headers, deadline, cancellationToken);
var convertStream = new StreamReader<Protocol.ActionType, FlightActionType>(response.ResponseStream, actionType => new FlightActionType(actionType));
return new AsyncServerStreamingCall<FlightActionType>(convertStream, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose);
}
public FlightRecordBatchStreamingCall GetStream(FlightTicket ticket, Metadata headers = null)
{
return GetStream(ticket, headers, null, CancellationToken.None);
}
public FlightRecordBatchStreamingCall GetStream(FlightTicket ticket, Metadata headers, System.DateTime? deadline, CancellationToken cancellationToken = default)
{
var stream = _client.DoGet(ticket.ToProtocol(), headers, deadline, cancellationToken);
var responseStream = new FlightClientRecordBatchStreamReader(stream.ResponseStream);
return new FlightRecordBatchStreamingCall(responseStream, stream.ResponseHeadersAsync, stream.GetStatus, stream.GetTrailers, stream.Dispose);
}
public AsyncUnaryCall<FlightInfo> GetInfo(FlightDescriptor flightDescriptor, Metadata headers = null)
{
return GetInfo(flightDescriptor, headers, null, CancellationToken.None);
}
public AsyncUnaryCall<FlightInfo> GetInfo(FlightDescriptor flightDescriptor, Metadata headers, System.DateTime? deadline, CancellationToken cancellationToken = default)
{
var flightInfoResult = _client.GetFlightInfoAsync(flightDescriptor.ToProtocol(), headers, deadline, cancellationToken);
var flightInfo = flightInfoResult
.ResponseAsync
.ContinueWith(async flightInfo => new FlightInfo(await flightInfo.ConfigureAwait(false)))
.Unwrap();
return new AsyncUnaryCall<FlightInfo>(
flightInfo,
flightInfoResult.ResponseHeadersAsync,
flightInfoResult.GetStatus,
flightInfoResult.GetTrailers,
flightInfoResult.Dispose);
}
/// <summary>
/// Start a Flight Put request.
/// </summary>
/// <param name="flightDescriptor">Descriptor for the data to be put</param>
/// <param name="headers">gRPC headers to send with the request</param>
/// <returns>A <see cref="FlightRecordBatchDuplexStreamingCall" /> object used to write data batches and receive responses</returns>
public FlightRecordBatchDuplexStreamingCall StartPut(FlightDescriptor flightDescriptor, Metadata headers = null)
{
return StartPut(flightDescriptor, headers, null, CancellationToken.None);
}
/// <summary>
/// Start a Flight Put request.
/// </summary>
/// <param name="flightDescriptor">Descriptor for the data to be put</param>
/// <param name="schema">The schema of the data</param>
/// <param name="headers">gRPC headers to send with the request</param>
/// <returns>A <see cref="FlightRecordBatchDuplexStreamingCall" /> object used to write data batches and receive responses</returns>
/// <remarks>Using this method rather than a StartPut overload that doesn't accept a schema
/// means that the schema is sent even if no data batches are sent</remarks>
public Task<FlightRecordBatchDuplexStreamingCall> StartPut(FlightDescriptor flightDescriptor, Schema schema, Metadata headers = null)
{
return StartPut(flightDescriptor, schema, headers, null, CancellationToken.None);
}
/// <summary>
/// Start a Flight Put request.
/// </summary>
/// <param name="flightDescriptor">Descriptor for the data to be put</param>
/// <param name="headers">gRPC headers to send with the request</param>
/// <param name="deadline">Optional deadline. The request will be cancelled if this deadline is reached.</param>
/// <param name="cancellationToken">Optional token for cancelling the request</param>
/// <returns>A <see cref="FlightRecordBatchDuplexStreamingCall" /> object used to write data batches and receive responses</returns>
public FlightRecordBatchDuplexStreamingCall StartPut(FlightDescriptor flightDescriptor, Metadata headers, System.DateTime? deadline, CancellationToken cancellationToken = default)
{
var channels = _client.DoPut(headers, deadline, cancellationToken);
var requestStream = new FlightClientRecordBatchStreamWriter(channels.RequestStream, flightDescriptor);
var readStream = new StreamReader<Protocol.PutResult, FlightPutResult>(channels.ResponseStream, putResult => new FlightPutResult(putResult));
return new FlightRecordBatchDuplexStreamingCall(
requestStream,
readStream,
channels.ResponseHeadersAsync,
channels.GetStatus,
channels.GetTrailers,
channels.Dispose);
}
/// <summary>
/// Start a Flight Put request.
/// </summary>
/// <param name="flightDescriptor">Descriptor for the data to be put</param>
/// <param name="schema">The schema of the data</param>
/// <param name="headers">gRPC headers to send with the request</param>
/// <param name="deadline">Optional deadline. The request will be cancelled if this deadline is reached.</param>
/// <param name="cancellationToken">Optional token for cancelling the request</param>
/// <returns>A <see cref="FlightRecordBatchDuplexStreamingCall" /> object used to write data batches and receive responses</returns>
/// <remarks>Using this method rather than a StartPut overload that doesn't accept a schema
/// means that the schema is sent even if no data batches are sent</remarks>
public async Task<FlightRecordBatchDuplexStreamingCall> StartPut(FlightDescriptor flightDescriptor, Schema schema, Metadata headers, System.DateTime? deadline, CancellationToken cancellationToken = default)
{
var channels = _client.DoPut(headers, deadline, cancellationToken);
var requestStream = new FlightClientRecordBatchStreamWriter(channels.RequestStream, flightDescriptor);
var readStream = new StreamReader<Protocol.PutResult, FlightPutResult>(channels.ResponseStream, putResult => new FlightPutResult(putResult));
var streamingCall = new FlightRecordBatchDuplexStreamingCall(
requestStream,
readStream,
channels.ResponseHeadersAsync,
channels.GetStatus,
channels.GetTrailers,
channels.Dispose);
await streamingCall.RequestStream.SetupStream(schema).ConfigureAwait(false);
return streamingCall;
}
public AsyncDuplexStreamingCall<FlightHandshakeRequest, FlightHandshakeResponse> Handshake(Metadata headers = null)
{
return Handshake(headers, null, CancellationToken.None);
}
public AsyncDuplexStreamingCall<FlightHandshakeRequest, FlightHandshakeResponse> Handshake(Metadata headers, System.DateTime? deadline, CancellationToken cancellationToken = default)
{
var channel = _client.Handshake(headers, deadline, cancellationToken);
var readStream = new StreamReader<HandshakeResponse, FlightHandshakeResponse>(channel.ResponseStream, response => new FlightHandshakeResponse(response));
var writeStream = new FlightHandshakeStreamWriterAdapter(channel.RequestStream);
var call = new AsyncDuplexStreamingCall<FlightHandshakeRequest, FlightHandshakeResponse>(
writeStream,
readStream,
channel.ResponseHeadersAsync,
channel.GetStatus,
channel.GetTrailers,
channel.Dispose);
return call;
}
public FlightRecordBatchExchangeCall DoExchange(FlightDescriptor flightDescriptor, Metadata headers = null)
{
return DoExchange(flightDescriptor, headers, null, CancellationToken.None);
}
public FlightRecordBatchExchangeCall DoExchange(FlightDescriptor flightDescriptor, Metadata headers, System.DateTime? deadline, CancellationToken cancellationToken = default)
{
var channel = _client.DoExchange(headers, deadline, cancellationToken);
var requestStream = new FlightClientRecordBatchStreamWriter(channel.RequestStream, flightDescriptor);
var responseStream = new FlightClientRecordBatchStreamReader(channel.ResponseStream);
var call = new FlightRecordBatchExchangeCall(
requestStream,
responseStream,
channel.ResponseHeadersAsync,
channel.GetStatus,
channel.GetTrailers,
channel.Dispose);
return call;
}
public AsyncServerStreamingCall<FlightResult> DoAction(FlightAction action, Metadata headers = null)
{
return DoAction(action, headers, null, CancellationToken.None);
}
public AsyncServerStreamingCall<FlightResult> DoAction(FlightAction action, Metadata headers, System.DateTime? deadline, CancellationToken cancellationToken = default)
{
var stream = _client.DoAction(action.ToProtocol(), headers, deadline, cancellationToken);
var streamReader = new StreamReader<Protocol.Result, FlightResult>(stream.ResponseStream, result => new FlightResult(result));
return new AsyncServerStreamingCall<FlightResult>(streamReader, stream.ResponseHeadersAsync, stream.GetStatus, stream.GetTrailers, stream.Dispose);
}
public AsyncUnaryCall<Schema> GetSchema(FlightDescriptor flightDescriptor, Metadata headers = null)
{
return GetSchema(flightDescriptor, headers, null, CancellationToken.None);
}
public AsyncUnaryCall<Schema> GetSchema(FlightDescriptor flightDescriptor, Metadata headers, System.DateTime? deadline, CancellationToken cancellationToken = default)
{
var schemaResult = _client.GetSchemaAsync(flightDescriptor.ToProtocol(), headers, deadline, cancellationToken);
var schema = schemaResult
.ResponseAsync
.ContinueWith(async schema => FlightMessageSerializer.DecodeSchema((await schemaResult.ResponseAsync.ConfigureAwait(false)).Schema.Memory))
.Unwrap();
return new AsyncUnaryCall<Schema>(
schema,
schemaResult.ResponseHeadersAsync,
schemaResult.GetStatus,
schemaResult.GetTrailers,
schemaResult.Dispose);
}
}
}