blob: e675cb1cefd7cd67b18bfb74ddeb4524f6403437 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Threading.Tasks;
using Apache.Arrow.Flight.Internal;
using Apache.Arrow.Flight.Protocol;
using Grpc.Core;
namespace Apache.Arrow.Flight.Server.Internal
{
/// <summary>
/// This class has to be internal, since the generated code from proto is set as internal.
/// </summary>
internal class FlightServerImplementation : FlightService.FlightServiceBase
{
private readonly FlightServer _flightServer;
public FlightServerImplementation(FlightServer flightServer)
{
_flightServer = flightServer;
}
public override async Task DoPut(IAsyncStreamReader<Protocol.FlightData> requestStream, IServerStreamWriter<Protocol.PutResult> responseStream, ServerCallContext context)
{
var readStream = new FlightServerRecordBatchStreamReader(requestStream);
var writeStream = new StreamWriter<FlightPutResult, PutResult>(responseStream, putResult => putResult.ToProtocol());
await _flightServer.DoPut(readStream, writeStream, context).ConfigureAwait(false);
}
public override Task DoGet(Protocol.Ticket request, IServerStreamWriter<Protocol.FlightData> responseStream, ServerCallContext context)
{
var flightTicket = new FlightTicket(request.Ticket_);
var flightServerRecordBatchStreamWriter = new FlightServerRecordBatchStreamWriter(responseStream);
return _flightServer.DoGet(flightTicket, flightServerRecordBatchStreamWriter, context);
}
public override Task ListFlights(Protocol.Criteria request, IServerStreamWriter<Protocol.FlightInfo> responseStream, ServerCallContext context)
{
var writeStream = new StreamWriter<FlightInfo, Protocol.FlightInfo>(responseStream, flightInfo => flightInfo.ToProtocol());
return _flightServer.ListFlights(new FlightCriteria(request), writeStream, context);
}
public override Task DoAction(Protocol.Action request, IServerStreamWriter<Protocol.Result> responseStream, ServerCallContext context)
{
var action = new FlightAction(request);
var writeStream = new StreamWriter<FlightResult, Protocol.Result>(responseStream, result => result.ToProtocol());
return _flightServer.DoAction(action, writeStream, context);
}
public override async Task<SchemaResult> GetSchema(Protocol.FlightDescriptor request, ServerCallContext context)
{
var flightDescriptor = new FlightDescriptor(request);
var schema = await _flightServer.GetSchema(flightDescriptor, context).ConfigureAwait(false);
return new SchemaResult()
{
Schema = SchemaWriter.SerializeSchema(schema)
};
}
public override async Task<Protocol.FlightInfo> GetFlightInfo(Protocol.FlightDescriptor request, ServerCallContext context)
{
var flightDescriptor = new FlightDescriptor(request);
FlightInfo flightInfo = await _flightServer.GetFlightInfo(flightDescriptor, context).ConfigureAwait(false);
return flightInfo.ToProtocol();
}
public override Task DoExchange(IAsyncStreamReader<Protocol.FlightData> requestStream, IServerStreamWriter<Protocol.FlightData> responseStream, ServerCallContext context)
{
var readStream = new FlightServerRecordBatchStreamReader(requestStream);
var writeStream = new FlightServerRecordBatchStreamWriter(responseStream);
return _flightServer.DoExchange(readStream, writeStream, context);
}
public override Task Handshake(IAsyncStreamReader<HandshakeRequest> requestStream, IServerStreamWriter<HandshakeResponse> responseStream, ServerCallContext context)
{
var readStream = new StreamReader<HandshakeRequest, FlightHandshakeRequest>(requestStream, request => new FlightHandshakeRequest(request));
var writeStream = new StreamWriter<FlightHandshakeResponse, Protocol.HandshakeResponse>(responseStream, result => result.ToProtocol());
return _flightServer.Handshake(readStream, writeStream, context);
}
public override async Task ListActions(Empty request, IServerStreamWriter<Protocol.ActionType> responseStream, ServerCallContext context)
{
var writeStream = new StreamWriter<FlightActionType, Protocol.ActionType>(responseStream, (actionType) => actionType.ToProtocol());
await _flightServer.ListActions(writeStream, context).ConfigureAwait(false);
}
}
}