| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::pin::Pin; |
| |
| use futures::Stream; |
| use tonic::transport::Server; |
| use tonic::{Request, Response, Status, Streaming}; |
| |
| use datafusion::datasource::parquet::ParquetTable; |
| use datafusion::datasource::TableProvider; |
| use datafusion::execution::context::ExecutionContext; |
| |
| use flight::{ |
| flight_service_server::FlightService, flight_service_server::FlightServiceServer, |
| Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, |
| HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, |
| }; |
| |
| #[derive(Clone)] |
| pub struct FlightServiceImpl {} |
| |
| /** |
| * Example Flight Server wrapping DataFusion that supports looking up schema information for |
| * Parquet files and executing SQL queries against Parquet files. |
| */ |
| #[tonic::async_trait] |
| impl FlightService for FlightServiceImpl { |
| type HandshakeStream = Pin< |
| Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send + Sync + 'static>, |
| >; |
| type ListFlightsStream = |
| Pin<Box<dyn Stream<Item = Result<FlightInfo, Status>> + Send + Sync + 'static>>; |
| type DoGetStream = |
| Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync + 'static>>; |
| type DoPutStream = |
| Pin<Box<dyn Stream<Item = Result<PutResult, Status>> + Send + Sync + 'static>>; |
| type DoActionStream = Pin< |
| Box<dyn Stream<Item = Result<flight::Result, Status>> + Send + Sync + 'static>, |
| >; |
| type ListActionsStream = |
| Pin<Box<dyn Stream<Item = Result<ActionType, Status>> + Send + Sync + 'static>>; |
| type DoExchangeStream = |
| Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync + 'static>>; |
| |
| async fn get_schema( |
| &self, |
| request: Request<FlightDescriptor>, |
| ) -> Result<Response<SchemaResult>, Status> { |
| let request = request.into_inner(); |
| |
| let table = ParquetTable::try_new(&request.path[0]).unwrap(); |
| |
| Ok(Response::new(SchemaResult::from(table.schema().as_ref()))) |
| } |
| |
| async fn do_get( |
| &self, |
| request: Request<Ticket>, |
| ) -> Result<Response<Self::DoGetStream>, Status> { |
| let ticket = request.into_inner(); |
| match String::from_utf8(ticket.ticket.to_vec()) { |
| Ok(sql) => { |
| println!("do_get: {}", sql); |
| |
| // create local execution context |
| let mut ctx = ExecutionContext::new(); |
| |
| let testdata = std::env::var("PARQUET_TEST_DATA") |
| .expect("PARQUET_TEST_DATA not defined"); |
| |
| // register parquet file with the execution context |
| ctx.register_parquet( |
| "alltypes_plain", |
| &format!("{}/alltypes_plain.parquet", testdata), |
| ) |
| .map_err(|e| to_tonic_err(&e))?; |
| |
| // create the query plan |
| let plan = ctx |
| .create_logical_plan(&sql) |
| .and_then(|plan| ctx.optimize(&plan)) |
| .and_then(|plan| ctx.create_physical_plan(&plan, 1024 * 1024)) |
| .map_err(|e| to_tonic_err(&e))?; |
| |
| // execute the query |
| let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?; |
| if results.is_empty() { |
| return Err(Status::internal("There were no results from ticket")); |
| } |
| |
| // add an initial FlightData message that sends schema |
| let schema = plan.schema(); |
| let mut flights: Vec<Result<FlightData, Status>> = |
| vec![Ok(FlightData::from(schema.as_ref()))]; |
| |
| let mut batches: Vec<Result<FlightData, Status>> = results |
| .iter() |
| .map(|batch| Ok(FlightData::from(batch))) |
| .collect(); |
| |
| // append batch vector to schema vector, so that the first message sent is the schema |
| flights.append(&mut batches); |
| |
| let output = futures::stream::iter(flights); |
| |
| Ok(Response::new(Box::pin(output) as Self::DoGetStream)) |
| } |
| Err(e) => Err(Status::invalid_argument(format!("Invalid ticket: {:?}", e))), |
| } |
| } |
| |
| async fn handshake( |
| &self, |
| _request: Request<Streaming<HandshakeRequest>>, |
| ) -> Result<Response<Self::HandshakeStream>, Status> { |
| Err(Status::unimplemented("Not yet implemented")) |
| } |
| |
| async fn list_flights( |
| &self, |
| _request: Request<Criteria>, |
| ) -> Result<Response<Self::ListFlightsStream>, Status> { |
| Err(Status::unimplemented("Not yet implemented")) |
| } |
| |
| async fn get_flight_info( |
| &self, |
| _request: Request<FlightDescriptor>, |
| ) -> Result<Response<FlightInfo>, Status> { |
| Err(Status::unimplemented("Not yet implemented")) |
| } |
| |
| async fn do_put( |
| &self, |
| _request: Request<Streaming<FlightData>>, |
| ) -> Result<Response<Self::DoPutStream>, Status> { |
| Err(Status::unimplemented("Not yet implemented")) |
| } |
| |
| async fn do_action( |
| &self, |
| _request: Request<Action>, |
| ) -> Result<Response<Self::DoActionStream>, Status> { |
| Err(Status::unimplemented("Not yet implemented")) |
| } |
| |
| async fn list_actions( |
| &self, |
| _request: Request<Empty>, |
| ) -> Result<Response<Self::ListActionsStream>, Status> { |
| Err(Status::unimplemented("Not yet implemented")) |
| } |
| |
| async fn do_exchange( |
| &self, |
| _request: Request<Streaming<FlightData>>, |
| ) -> Result<Response<Self::DoExchangeStream>, Status> { |
| Err(Status::unimplemented("Not yet implemented")) |
| } |
| } |
| |
| fn to_tonic_err(e: &datafusion::error::ExecutionError) -> Status { |
| Status::internal(format!("{:?}", e)) |
| } |
| |
| #[tokio::main] |
| async fn main() -> Result<(), Box<dyn std::error::Error>> { |
| let addr = "0.0.0.0:50051".parse()?; |
| let service = FlightServiceImpl {}; |
| |
| let svc = FlightServiceServer::new(service); |
| |
| println!("Listening on {:?}", addr); |
| |
| Server::builder().add_service(svc).serve(addr).await?; |
| |
| Ok(()) |
| } |