// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::pin::Pin;
use futures::Stream;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};
use datafusion::datasource::TableProvider;
use datafusion::prelude::*;
use datafusion::{datasource::parquet::ParquetTable, physical_plan::collect};
use arrow_flight::{
flight_service_server::FlightService, flight_service_server::FlightServiceServer,
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket,
pub struct FlightServiceImpl {}
impl FlightService for FlightServiceImpl {
type HandshakeStream = Pin<
Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send + Sync + 'static>,
type ListFlightsStream =
Pin<Box<dyn Stream<Item = Result<FlightInfo, Status>> + Send + Sync + 'static>>;
type DoGetStream =
Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync + 'static>>;
type DoPutStream =
Pin<Box<dyn Stream<Item = Result<PutResult, Status>> + Send + Sync + 'static>>;
type DoActionStream = Pin<
dyn Stream<Item = Result<arrow_flight::Result, Status>>
+ Send
+ Sync
+ 'static,
type ListActionsStream =
Pin<Box<dyn Stream<Item = Result<ActionType, Status>> + Send + Sync + 'static>>;
type DoExchangeStream =
Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync + 'static>>;
async fn get_schema(
request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, Status> {
let request = request.into_inner();
let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap();
let options = arrow::ipc::writer::IpcWriteOptions::default();
let schema_result = arrow_flight::utils::flight_schema_from_arrow_schema(
async fn do_get(
request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, Status> {
let ticket = request.into_inner();
match String::from_utf8(ticket.ticket.to_vec()) {
Ok(sql) => {
println!("do_get: {}", sql);
// create local execution context
let mut ctx = ExecutionContext::new();
let testdata = arrow::util::test_util::parquet_test_data();
// register parquet file with the execution context
&format!("{}/alltypes_plain.parquet", testdata),
.map_err(|e| to_tonic_err(&e))?;
// create the query plan
let plan = ctx
.and_then(|plan| ctx.optimize(&plan))
.and_then(|plan| ctx.create_physical_plan(&plan))
.map_err(|e| to_tonic_err(&e))?;
// execute the query
let results =
collect(plan.clone()).await.map_err(|e| to_tonic_err(&e))?;
if results.is_empty() {
return Err(Status::internal("There were no results from ticket"));
// add an initial FlightData message that sends schema
let options = arrow::ipc::writer::IpcWriteOptions::default();
let schema = plan.schema();
let schema_flight_data =
let mut flights: Vec<Result<FlightData, Status>> =
let mut batches: Vec<Result<FlightData, Status>> = results
.flat_map(|batch| {
let (flight_dictionaries, flight_batch) =
batch, &options,
// append batch vector to schema vector, so that the first message sent is the schema
flights.append(&mut batches);
let output = futures::stream::iter(flights);
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
Err(e) => Err(Status::invalid_argument(format!("Invalid ticket: {:?}", e))),
async fn handshake(
_request: Request<Streaming<HandshakeRequest>>,
) -> Result<Response<Self::HandshakeStream>, Status> {
Err(Status::unimplemented("Not yet implemented"))
async fn list_flights(
_request: Request<Criteria>,
) -> Result<Response<Self::ListFlightsStream>, Status> {
Err(Status::unimplemented("Not yet implemented"))
async fn get_flight_info(
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
Err(Status::unimplemented("Not yet implemented"))
async fn do_put(
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, Status> {
Err(Status::unimplemented("Not yet implemented"))
async fn do_action(
_request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, Status> {
Err(Status::unimplemented("Not yet implemented"))
async fn list_actions(
_request: Request<Empty>,
) -> Result<Response<Self::ListActionsStream>, Status> {
Err(Status::unimplemented("Not yet implemented"))
async fn do_exchange(
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, Status> {
Err(Status::unimplemented("Not yet implemented"))
fn to_tonic_err(e: &datafusion::error::DataFusionError) -> Status {
Status::internal(format!("{:?}", e))
/// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for
/// Parquet files and executing SQL queries against them on a remote server.
/// This example is run along-side the example `flight_client`.
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "".parse()?;
let service = FlightServiceImpl {};
let svc = FlightServiceServer::new(service);
println!("Listening on {:?}", addr);