| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! A native Rust implementation of [Apache Arrow Flight](https://arrow.apache.org/docs/format/Flight.html) |
| //! for exchanging [Arrow](https://arrow.apache.org) data between processes. |
| //! |
| //! Please see the [arrow-flight crates.io](https://crates.io/crates/arrow-flight) |
| //! page for feature flags and more information. |
| //! |
| //! # Overview |
| //! |
| //! This crate contains: |
| //! |
| //! 1. Low level [prost] generated structs |
| //! for Flight gRPC protobuf messages, such as [`FlightData`], [`FlightInfo`], |
| //! [`Location`] and [`Ticket`]. |
| //! |
| //! 2. Low level [tonic] generated [`flight_service_client`] and |
| //! [`flight_service_server`]. |
| //! |
| //! 3. Support for [Flight SQL] in [`sql`]. Requires the |
| //! `flight-sql` feature of this crate to be activated. |
| //! |
| //! [Flight SQL]: https://arrow.apache.org/docs/format/FlightSql.html |
| |
| #![doc( |
| html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg", |
| html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg" |
| )] |
| #![cfg_attr(docsrs, feature(doc_cfg))] |
| #![allow(rustdoc::invalid_html_tags)] |
| #![warn(missing_docs)] |
| // The unused_crate_dependencies lint does not work well for crates defining additional examples/bin targets |
| #![allow(unused_crate_dependencies)] |
| |
| use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions}; |
| use arrow_schema::{ArrowError, Schema}; |
| |
| use arrow_ipc::convert::try_schema_from_ipc_buffer; |
| use base64::Engine; |
| use base64::prelude::BASE64_STANDARD; |
| use bytes::Bytes; |
| use prost_types::Timestamp; |
| use std::{fmt, ops::Deref}; |
| |
| type ArrowResult<T> = std::result::Result<T, ArrowError>; |
| |
| #[allow(clippy::all)] |
| mod r#gen { |
| // Since this file is auto-generated, we suppress all warnings |
| #![allow(missing_docs)] |
| include!("arrow.flight.protocol.rs"); |
| } |
| |
| /// Defines a `Flight` for generation or retrieval. |
| pub mod flight_descriptor { |
| use super::r#gen; |
| pub use r#gen::flight_descriptor::DescriptorType; |
| } |
| |
| /// Low Level [tonic] [`FlightServiceClient`](gen::flight_service_client::FlightServiceClient). |
| pub mod flight_service_client { |
| use super::r#gen; |
| pub use r#gen::flight_service_client::FlightServiceClient; |
| } |
| |
| /// Low Level [tonic] [`FlightServiceServer`](gen::flight_service_server::FlightServiceServer) |
| /// and [`FlightService`](gen::flight_service_server::FlightService). |
| pub mod flight_service_server { |
| use super::r#gen; |
| pub use r#gen::flight_service_server::FlightService; |
| pub use r#gen::flight_service_server::FlightServiceServer; |
| } |
| |
| /// Mid Level [`FlightClient`] |
| pub mod client; |
| pub use client::FlightClient; |
| |
| /// Decoder to create [`RecordBatch`](arrow_array::RecordBatch) streams from [`FlightData`] streams. |
| /// See [`FlightRecordBatchStream`](decode::FlightRecordBatchStream). |
| pub mod decode; |
| |
| /// Encoder to create [`FlightData`] streams from [`RecordBatch`](arrow_array::RecordBatch) streams. |
| /// See [`FlightDataEncoderBuilder`](encode::FlightDataEncoderBuilder). |
| pub mod encode; |
| |
| /// Common error types |
| pub mod error; |
| |
| pub use r#gen::Action; |
| pub use r#gen::ActionType; |
| pub use r#gen::BasicAuth; |
| pub use r#gen::CancelFlightInfoRequest; |
| pub use r#gen::CancelFlightInfoResult; |
| pub use r#gen::CancelStatus; |
| pub use r#gen::Criteria; |
| pub use r#gen::Empty; |
| pub use r#gen::FlightData; |
| pub use r#gen::FlightDescriptor; |
| pub use r#gen::FlightEndpoint; |
| pub use r#gen::FlightInfo; |
| pub use r#gen::HandshakeRequest; |
| pub use r#gen::HandshakeResponse; |
| pub use r#gen::Location; |
| pub use r#gen::PollInfo; |
| pub use r#gen::PutResult; |
| pub use r#gen::RenewFlightEndpointRequest; |
| pub use r#gen::Result; |
| pub use r#gen::SchemaResult; |
| pub use r#gen::Ticket; |
| |
| /// Helper to extract HTTP/gRPC trailers from a tonic stream. |
| mod trailers; |
| |
| pub mod utils; |
| |
| #[cfg(feature = "flight-sql")] |
| pub mod sql; |
| mod streams; |
| |
| use flight_descriptor::DescriptorType; |
| |
| /// SchemaAsIpc represents a pairing of a `Schema` with IpcWriteOptions |
| pub struct SchemaAsIpc<'a> { |
| /// Data type representing a schema and its IPC write options |
| pub pair: (&'a Schema, &'a IpcWriteOptions), |
| } |
| |
| /// IpcMessage represents a `Schema` in the format expected in |
| /// `FlightInfo.schema` |
| #[derive(Debug)] |
| pub struct IpcMessage(pub Bytes); |
| |
| // Useful conversion functions |
| |
| fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData { |
| let data_gen = writer::IpcDataGenerator::default(); |
| let mut dict_tracker = writer::DictionaryTracker::new(false); |
| data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options) |
| } |
| |
| fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage { |
| let encoded_data = flight_schema_as_encoded_data(schema, options); |
| IpcMessage(encoded_data.ipc_message.into()) |
| } |
| |
| // Implement a bunch of useful traits for various conversions, displays, |
| // etc... |
| |
| // Deref |
| |
| impl Deref for IpcMessage { |
| type Target = [u8]; |
| |
| fn deref(&self) -> &Self::Target { |
| &self.0 |
| } |
| } |
| |
| impl<'a> Deref for SchemaAsIpc<'a> { |
| type Target = (&'a Schema, &'a IpcWriteOptions); |
| |
| fn deref(&self) -> &Self::Target { |
| &self.pair |
| } |
| } |
| |
| // Display... |
| |
| /// Limits the output of value to limit... |
| fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result { |
| if value.len() > limit { |
| write!(f, "{:?}", &value[..limit]) |
| } else { |
| write!(f, "{:?}", &value) |
| } |
| } |
| |
| impl fmt::Display for FlightData { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| write!(f, "FlightData {{")?; |
| write!(f, " descriptor: ")?; |
| match &self.flight_descriptor { |
| Some(d) => write!(f, "{d}")?, |
| None => write!(f, "None")?, |
| }; |
| write!(f, ", header: ")?; |
| limited_fmt(f, &self.data_header, 8)?; |
| write!(f, ", metadata: ")?; |
| limited_fmt(f, &self.app_metadata, 8)?; |
| write!(f, ", body: ")?; |
| limited_fmt(f, &self.data_body, 8)?; |
| write!(f, " }}") |
| } |
| } |
| |
| impl fmt::Display for FlightDescriptor { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| write!(f, "FlightDescriptor {{")?; |
| write!(f, " type: ")?; |
| match self.r#type() { |
| DescriptorType::Cmd => { |
| write!(f, "cmd, value: ")?; |
| limited_fmt(f, &self.cmd, 8)?; |
| } |
| DescriptorType::Path => { |
| write!(f, "path: [")?; |
| let mut sep = ""; |
| for element in &self.path { |
| write!(f, "{sep}{element}")?; |
| sep = ", "; |
| } |
| write!(f, "]")?; |
| } |
| DescriptorType::Unknown => { |
| write!(f, "unknown")?; |
| } |
| } |
| write!(f, " }}") |
| } |
| } |
| |
| impl fmt::Display for FlightEndpoint { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| write!(f, "FlightEndpoint {{")?; |
| write!(f, " ticket: ")?; |
| match &self.ticket { |
| Some(value) => write!(f, "{value}"), |
| None => write!(f, " None"), |
| }?; |
| write!(f, ", location: [")?; |
| let mut sep = ""; |
| for location in &self.location { |
| write!(f, "{sep}{location}")?; |
| sep = ", "; |
| } |
| write!(f, "]")?; |
| write!(f, ", expiration_time:")?; |
| match &self.expiration_time { |
| Some(value) => write!(f, " {value}"), |
| None => write!(f, " None"), |
| }?; |
| write!(f, ", app_metadata: ")?; |
| limited_fmt(f, &self.app_metadata, 8)?; |
| write!(f, " }}") |
| } |
| } |
| |
| impl fmt::Display for FlightInfo { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| let ipc_message = IpcMessage(self.schema.clone()); |
| let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?; |
| write!(f, "FlightInfo {{")?; |
| write!(f, " schema: {schema}")?; |
| write!(f, ", descriptor:")?; |
| match &self.flight_descriptor { |
| Some(d) => write!(f, " {d}"), |
| None => write!(f, " None"), |
| }?; |
| write!(f, ", endpoint: [")?; |
| let mut sep = ""; |
| for endpoint in &self.endpoint { |
| write!(f, "{sep}{endpoint}")?; |
| sep = ", "; |
| } |
| write!(f, "], total_records: {}", self.total_records)?; |
| write!(f, ", total_bytes: {}", self.total_bytes)?; |
| write!(f, ", ordered: {}", self.ordered)?; |
| write!(f, ", app_metadata: ")?; |
| limited_fmt(f, &self.app_metadata, 8)?; |
| write!(f, " }}") |
| } |
| } |
| |
| impl fmt::Display for PollInfo { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| write!(f, "PollInfo {{")?; |
| write!(f, " info:")?; |
| match &self.info { |
| Some(value) => write!(f, " {value}"), |
| None => write!(f, " None"), |
| }?; |
| write!(f, ", descriptor:")?; |
| match &self.flight_descriptor { |
| Some(d) => write!(f, " {d}"), |
| None => write!(f, " None"), |
| }?; |
| write!(f, ", progress:")?; |
| match &self.progress { |
| Some(value) => write!(f, " {value}"), |
| None => write!(f, " None"), |
| }?; |
| write!(f, ", expiration_time:")?; |
| match &self.expiration_time { |
| Some(value) => write!(f, " {value}"), |
| None => write!(f, " None"), |
| }?; |
| write!(f, " }}") |
| } |
| } |
| |
| impl fmt::Display for CancelFlightInfoRequest { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| write!(f, "CancelFlightInfoRequest {{")?; |
| write!(f, " info: ")?; |
| match &self.info { |
| Some(value) => write!(f, "{value}")?, |
| None => write!(f, "None")?, |
| }; |
| write!(f, " }}") |
| } |
| } |
| |
| impl fmt::Display for CancelFlightInfoResult { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| write!(f, "CancelFlightInfoResult {{")?; |
| write!(f, " status: {}", self.status().as_str_name())?; |
| write!(f, " }}") |
| } |
| } |
| |
| impl fmt::Display for RenewFlightEndpointRequest { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| write!(f, "RenewFlightEndpointRequest {{")?; |
| write!(f, " endpoint: ")?; |
| match &self.endpoint { |
| Some(value) => write!(f, "{value}")?, |
| None => write!(f, "None")?, |
| }; |
| write!(f, " }}") |
| } |
| } |
| |
| impl fmt::Display for Location { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| write!(f, "Location {{")?; |
| write!(f, " uri: ")?; |
| write!(f, "{}", self.uri) |
| } |
| } |
| |
| impl fmt::Display for Ticket { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| write!(f, "Ticket {{")?; |
| write!(f, " ticket: ")?; |
| write!(f, "{}", BASE64_STANDARD.encode(&self.ticket)) |
| } |
| } |
| |
| // From... |
| |
| impl From<EncodedData> for FlightData { |
| fn from(data: EncodedData) -> Self { |
| FlightData { |
| data_header: data.ipc_message.into(), |
| data_body: data.arrow_data.into(), |
| ..Default::default() |
| } |
| } |
| } |
| |
| impl From<SchemaAsIpc<'_>> for FlightData { |
| fn from(schema_ipc: SchemaAsIpc) -> Self { |
| let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1); |
| FlightData { |
| data_header: vals, |
| ..Default::default() |
| } |
| } |
| } |
| |
| impl TryFrom<SchemaAsIpc<'_>> for SchemaResult { |
| type Error = ArrowError; |
| |
| fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> { |
| // According to the definition from `Flight.proto` |
| // The schema of the dataset in its IPC form: |
| // 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix |
| // 4 bytes - the byte length of the payload |
| // a flatbuffer Message whose header is the Schema |
| let IpcMessage(vals) = schema_to_ipc_format(schema_ipc)?; |
| Ok(SchemaResult { schema: vals }) |
| } |
| } |
| |
| impl TryFrom<SchemaAsIpc<'_>> for IpcMessage { |
| type Error = ArrowError; |
| |
| fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> { |
| schema_to_ipc_format(schema_ipc) |
| } |
| } |
| |
| fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> { |
| let pair = *schema_ipc; |
| let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1); |
| |
| let mut schema = vec![]; |
| writer::write_message(&mut schema, encoded_data, pair.1)?; |
| Ok(IpcMessage(schema.into())) |
| } |
| |
| impl TryFrom<&FlightData> for Schema { |
| type Error = ArrowError; |
| fn try_from(data: &FlightData) -> ArrowResult<Self> { |
| convert::try_schema_from_flatbuffer_bytes(&data.data_header[..]).map_err(|err| { |
| ArrowError::ParseError(format!( |
| "Unable to convert flight data to Arrow schema: {err}" |
| )) |
| }) |
| } |
| } |
| |
| impl TryFrom<FlightInfo> for Schema { |
| type Error = ArrowError; |
| |
| fn try_from(value: FlightInfo) -> ArrowResult<Self> { |
| value.try_decode_schema() |
| } |
| } |
| |
| impl TryFrom<IpcMessage> for Schema { |
| type Error = ArrowError; |
| |
| fn try_from(value: IpcMessage) -> ArrowResult<Self> { |
| try_schema_from_ipc_buffer(&value) |
| } |
| } |
| |
| impl TryFrom<&SchemaResult> for Schema { |
| type Error = ArrowError; |
| fn try_from(data: &SchemaResult) -> ArrowResult<Self> { |
| try_schema_from_ipc_buffer(&data.schema) |
| } |
| } |
| |
| impl TryFrom<SchemaResult> for Schema { |
| type Error = ArrowError; |
| fn try_from(data: SchemaResult) -> ArrowResult<Self> { |
| (&data).try_into() |
| } |
| } |
| |
| // FlightData, FlightDescriptor, etc.. |
| |
| impl FlightData { |
| /// Create a new [`FlightData`]. |
| /// |
| /// # See Also |
| /// |
| /// See [`FlightDataEncoderBuilder`] for a higher level API to |
| /// convert a stream of [`RecordBatch`]es to [`FlightData`]s |
| /// |
| /// # Example: |
| /// |
| /// ``` |
| /// # use bytes::Bytes; |
| /// # use arrow_flight::{FlightData, FlightDescriptor}; |
| /// # fn encode_data() -> Bytes { Bytes::new() } // dummy data |
| /// // Get encoded Arrow IPC data: |
| /// let data_body: Bytes = encode_data(); |
| /// // Create the FlightData message |
| /// let flight_data = FlightData::new() |
| /// .with_descriptor(FlightDescriptor::new_cmd("the command")) |
| /// .with_app_metadata("My apps metadata") |
| /// .with_data_body(data_body); |
| /// ``` |
| /// |
| /// [`FlightDataEncoderBuilder`]: crate::encode::FlightDataEncoderBuilder |
| /// [`RecordBatch`]: arrow_array::RecordBatch |
| pub fn new() -> Self { |
| Default::default() |
| } |
| |
| /// Add a [`FlightDescriptor`] describing the data |
| pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self { |
| self.flight_descriptor = Some(flight_descriptor); |
| self |
| } |
| |
| /// Add a data header |
| pub fn with_data_header(mut self, data_header: impl Into<Bytes>) -> Self { |
| self.data_header = data_header.into(); |
| self |
| } |
| |
| /// Add a data body. See [`IpcDataGenerator`] to create this data. |
| /// |
| /// [`IpcDataGenerator`]: arrow_ipc::writer::IpcDataGenerator |
| pub fn with_data_body(mut self, data_body: impl Into<Bytes>) -> Self { |
| self.data_body = data_body.into(); |
| self |
| } |
| |
| /// Add optional application specific metadata to the message |
| pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self { |
| self.app_metadata = app_metadata.into(); |
| self |
| } |
| } |
| |
| impl FlightDescriptor { |
| /// Create a new opaque command [`CMD`] `FlightDescriptor` to generate a dataset. |
| /// |
| /// [`CMD`]: https://github.com/apache/arrow/blob/6bd31f37ae66bd35594b077cb2f830be57e08acd/format/Flight.proto#L224-L227 |
| pub fn new_cmd(cmd: impl Into<Bytes>) -> Self { |
| FlightDescriptor { |
| r#type: DescriptorType::Cmd.into(), |
| cmd: cmd.into(), |
| ..Default::default() |
| } |
| } |
| |
| /// Create a new named path [`PATH`] `FlightDescriptor` that identifies a dataset |
| /// |
| /// [`PATH`]: https://github.com/apache/arrow/blob/6bd31f37ae66bd35594b077cb2f830be57e08acd/format/Flight.proto#L217-L222 |
| pub fn new_path(path: Vec<String>) -> Self { |
| FlightDescriptor { |
| r#type: DescriptorType::Path.into(), |
| path, |
| ..Default::default() |
| } |
| } |
| } |
| |
| impl FlightInfo { |
| /// Create a new, empty `FlightInfo`, describing where to fetch flight data |
| /// |
| /// |
| /// # Example: |
| /// ``` |
| /// # use arrow_flight::{FlightInfo, Ticket, FlightDescriptor, FlightEndpoint}; |
| /// # use arrow_schema::{Schema, Field, DataType}; |
| /// # fn get_schema() -> Schema { |
| /// # Schema::new(vec![ |
| /// # Field::new("a", DataType::Utf8, false), |
| /// # ]) |
| /// # } |
| /// # |
| /// // Create a new FlightInfo |
| /// let flight_info = FlightInfo::new() |
| /// // Encode the Arrow schema |
| /// .try_with_schema(&get_schema()) |
| /// .expect("encoding failed") |
| /// .with_endpoint( |
| /// FlightEndpoint::new() |
| /// .with_ticket(Ticket::new("ticket contents") |
| /// ) |
| /// ) |
| /// .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY")); |
| /// ``` |
| pub fn new() -> FlightInfo { |
| FlightInfo { |
| schema: Bytes::new(), |
| flight_descriptor: None, |
| endpoint: vec![], |
| ordered: false, |
| // Flight says "Set these to -1 if unknown." |
| // |
| // https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L287-L289 |
| total_records: -1, |
| total_bytes: -1, |
| app_metadata: Bytes::new(), |
| } |
| } |
| |
| /// Try and convert the data in this `FlightInfo` into a [`Schema`] |
| pub fn try_decode_schema(self) -> ArrowResult<Schema> { |
| let msg = IpcMessage(self.schema); |
| msg.try_into() |
| } |
| |
| /// Specify the schema for the response. |
| /// |
| /// Note this takes the arrow [`Schema`] (not the IPC schema) and |
| /// encodes it using the default IPC options. |
| /// |
| /// Returns an error if `schema` can not be encoded into IPC form. |
| pub fn try_with_schema(mut self, schema: &Schema) -> ArrowResult<Self> { |
| let options = IpcWriteOptions::default(); |
| let IpcMessage(schema) = SchemaAsIpc::new(schema, &options).try_into()?; |
| self.schema = schema; |
| Ok(self) |
| } |
| |
| /// Add specific a endpoint for fetching the data |
| pub fn with_endpoint(mut self, endpoint: FlightEndpoint) -> Self { |
| self.endpoint.push(endpoint); |
| self |
| } |
| |
| /// Add a [`FlightDescriptor`] describing what this data is |
| pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self { |
| self.flight_descriptor = Some(flight_descriptor); |
| self |
| } |
| |
| /// Set the number of records in the result, if known |
| pub fn with_total_records(mut self, total_records: i64) -> Self { |
| self.total_records = total_records; |
| self |
| } |
| |
| /// Set the number of bytes in the result, if known |
| pub fn with_total_bytes(mut self, total_bytes: i64) -> Self { |
| self.total_bytes = total_bytes; |
| self |
| } |
| |
| /// Specify if the response is [ordered] across endpoints |
| /// |
| /// [ordered]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L269-L275 |
| pub fn with_ordered(mut self, ordered: bool) -> Self { |
| self.ordered = ordered; |
| self |
| } |
| |
| /// Add optional application specific metadata to the message |
| pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self { |
| self.app_metadata = app_metadata.into(); |
| self |
| } |
| } |
| |
| impl PollInfo { |
| /// Create a new, empty [`PollInfo`], providing information for a long-running query |
| /// |
| /// # Example: |
| /// ``` |
| /// # use arrow_flight::{FlightInfo, PollInfo, FlightDescriptor}; |
| /// # use prost_types::Timestamp; |
| /// // Create a new PollInfo |
| /// let poll_info = PollInfo::new() |
| /// .with_info(FlightInfo::new()) |
| /// .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY")) |
| /// .try_with_progress(0.5) |
| /// .expect("progress should've been valid") |
| /// .with_expiration_time( |
| /// "1970-01-01".parse().expect("invalid timestamp") |
| /// ); |
| /// ``` |
| pub fn new() -> Self { |
| Self { |
| info: None, |
| flight_descriptor: None, |
| progress: None, |
| expiration_time: None, |
| } |
| } |
| |
| /// Add the current available results for the poll call as a [`FlightInfo`] |
| pub fn with_info(mut self, info: FlightInfo) -> Self { |
| self.info = Some(info); |
| self |
| } |
| |
| /// Add a [`FlightDescriptor`] that the client should use for the next poll call, |
| /// if the query is not yet complete |
| pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self { |
| self.flight_descriptor = Some(flight_descriptor); |
| self |
| } |
| |
| /// Set the query progress if known. Must be in the range [0.0, 1.0] else this will |
| /// return an error |
| pub fn try_with_progress(mut self, progress: f64) -> ArrowResult<Self> { |
| if !(0.0..=1.0).contains(&progress) { |
| return Err(ArrowError::InvalidArgumentError(format!( |
| "PollInfo progress must be in the range [0.0, 1.0], got {progress}" |
| ))); |
| } |
| self.progress = Some(progress); |
| Ok(self) |
| } |
| |
| /// Specify expiration time for this request |
| pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self { |
| self.expiration_time = Some(expiration_time); |
| self |
| } |
| } |
| |
| impl<'a> SchemaAsIpc<'a> { |
| /// Create a new `SchemaAsIpc` from a `Schema` and `IpcWriteOptions` |
| pub fn new(schema: &'a Schema, options: &'a IpcWriteOptions) -> Self { |
| SchemaAsIpc { |
| pair: (schema, options), |
| } |
| } |
| } |
| |
| impl CancelFlightInfoRequest { |
| /// Create a new [`CancelFlightInfoRequest`], providing the [`FlightInfo`] |
| /// of the query to cancel. |
| pub fn new(info: FlightInfo) -> Self { |
| Self { info: Some(info) } |
| } |
| } |
| |
| impl CancelFlightInfoResult { |
| /// Create a new [`CancelFlightInfoResult`] from the provided [`CancelStatus`]. |
| pub fn new(status: CancelStatus) -> Self { |
| Self { |
| status: status as i32, |
| } |
| } |
| } |
| |
| impl RenewFlightEndpointRequest { |
| /// Create a new [`RenewFlightEndpointRequest`], providing the [`FlightEndpoint`] |
| /// for which is being requested an extension of its expiration. |
| pub fn new(endpoint: FlightEndpoint) -> Self { |
| Self { |
| endpoint: Some(endpoint), |
| } |
| } |
| } |
| |
| impl Action { |
| /// Create a new Action with type and body |
| pub fn new(action_type: impl Into<String>, body: impl Into<Bytes>) -> Self { |
| Self { |
| r#type: action_type.into(), |
| body: body.into(), |
| } |
| } |
| } |
| |
| impl Result { |
| /// Create a new Result with the specified body |
| pub fn new(body: impl Into<Bytes>) -> Self { |
| Self { body: body.into() } |
| } |
| } |
| |
| impl Ticket { |
| /// Create a new `Ticket` |
| /// |
| /// # Example |
| /// |
| /// ``` |
| /// # use arrow_flight::Ticket; |
| /// let ticket = Ticket::new("SELECT * from FOO"); |
| /// ``` |
| pub fn new(ticket: impl Into<Bytes>) -> Self { |
| Self { |
| ticket: ticket.into(), |
| } |
| } |
| } |
| |
| impl FlightEndpoint { |
| /// Create a new, empty `FlightEndpoint` that represents a location |
| /// to retrieve Flight results. |
| /// |
| /// # Example |
| /// ``` |
| /// # use arrow_flight::{FlightEndpoint, Ticket}; |
| /// # |
| /// // Specify the client should fetch results from this server |
| /// let endpoint = FlightEndpoint::new() |
| /// .with_ticket(Ticket::new("the ticket")); |
| /// |
| /// // Specify the client should fetch results from either |
| /// // `http://example.com` or `https://example.com` |
| /// let endpoint = FlightEndpoint::new() |
| /// .with_ticket(Ticket::new("the ticket")) |
| /// .with_location("http://example.com") |
| /// .with_location("https://example.com"); |
| /// ``` |
| pub fn new() -> FlightEndpoint { |
| Default::default() |
| } |
| |
| /// Set the [`Ticket`] used to retrieve data from the endpoint |
| pub fn with_ticket(mut self, ticket: Ticket) -> Self { |
| self.ticket = Some(ticket); |
| self |
| } |
| |
| /// Add a location `uri` to this endpoint. Note each endpoint can |
| /// have multiple locations. |
| /// |
| /// If no `uri` is specified, the [Flight Spec] says: |
| /// |
| /// ```text |
| /// * If the list is empty, the expectation is that the ticket can only |
| /// * be redeemed on the current service where the ticket was |
| /// * generated. |
| /// ``` |
| /// [Flight Spec]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L307C2-L312 |
| pub fn with_location(mut self, uri: impl Into<String>) -> Self { |
| self.location.push(Location { uri: uri.into() }); |
| self |
| } |
| |
| /// Specify expiration time for this stream |
| pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self { |
| self.expiration_time = Some(expiration_time); |
| self |
| } |
| |
| /// Add optional application specific metadata to the message |
| pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self { |
| self.app_metadata = app_metadata.into(); |
| self |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use arrow_ipc::MetadataVersion; |
| use arrow_schema::{DataType, Field, TimeUnit}; |
| |
| struct TestVector(Vec<u8>, usize); |
| |
| impl fmt::Display for TestVector { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| limited_fmt(f, &self.0, self.1) |
| } |
| } |
| |
| #[test] |
| fn it_creates_flight_descriptor_command() { |
| let expected_cmd = "my_command".as_bytes(); |
| let fd = FlightDescriptor::new_cmd(expected_cmd.to_vec()); |
| assert_eq!(fd.r#type(), DescriptorType::Cmd); |
| assert_eq!(fd.cmd, expected_cmd.to_vec()); |
| } |
| |
| #[test] |
| fn it_accepts_equal_output() { |
| let input = TestVector(vec![91; 10], 10); |
| |
| let actual = format!("{input}"); |
| let expected = format!("{:?}", vec![91; 10]); |
| assert_eq!(actual, expected); |
| } |
| |
| #[test] |
| fn it_accepts_short_output() { |
| let input = TestVector(vec![91; 6], 10); |
| |
| let actual = format!("{input}"); |
| let expected = format!("{:?}", vec![91; 6]); |
| assert_eq!(actual, expected); |
| } |
| |
| #[test] |
| fn it_accepts_long_output() { |
| let input = TestVector(vec![91; 10], 9); |
| |
| let actual = format!("{input}"); |
| let expected = format!("{:?}", vec![91; 9]); |
| assert_eq!(actual, expected); |
| } |
| |
| #[test] |
| fn ser_deser_schema_result() { |
| let schema = Schema::new(vec![ |
| Field::new("c1", DataType::Utf8, false), |
| Field::new("c2", DataType::Float64, true), |
| Field::new("c3", DataType::UInt32, false), |
| Field::new("c4", DataType::Boolean, true), |
| Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true), |
| Field::new("c6", DataType::Time32(TimeUnit::Second), false), |
| ]); |
| // V5 with write_legacy_ipc_format = false |
| // this will write the continuation marker |
| let option = IpcWriteOptions::default(); |
| let schema_ipc = SchemaAsIpc::new(&schema, &option); |
| let result: SchemaResult = schema_ipc.try_into().unwrap(); |
| let des_schema: Schema = (&result).try_into().unwrap(); |
| assert_eq!(schema, des_schema); |
| |
| // V4 with write_legacy_ipc_format = true |
| // this will not write the continuation marker |
| let option = IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap(); |
| let schema_ipc = SchemaAsIpc::new(&schema, &option); |
| let result: SchemaResult = schema_ipc.try_into().unwrap(); |
| let des_schema: Schema = (&result).try_into().unwrap(); |
| assert_eq!(schema, des_schema); |
| } |
| |
| #[test] |
| fn test_dict_schema() { |
| // Test for https://github.com/apache/arrow-rs/issues/7058 |
| let schema = Schema::new(vec![ |
| Field::new( |
| "a", |
| DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), |
| false, |
| ), |
| Field::new( |
| "b", |
| DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), |
| false, |
| ), |
| ]); |
| |
| let flight_info = FlightInfo::new().try_with_schema(&schema).unwrap(); |
| |
| let new_schema = Schema::try_from(flight_info).unwrap(); |
| assert_eq!(schema, new_schema); |
| } |
| } |