| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #[cfg(not(target_os = "windows"))] |
| mod non_windows { |
| use datafusion::assert_batches_eq; |
| use datafusion_common::instant::Instant; |
| use std::fs::{File, OpenOptions}; |
| use std::io::Write; |
| use std::path::PathBuf; |
| use std::sync::atomic::{AtomicBool, Ordering}; |
| use std::sync::Arc; |
| use std::thread; |
| use std::time::Duration; |
| |
| use arrow::datatypes::{DataType, Field, Schema}; |
| use arrow_schema::SchemaRef; |
| use futures::StreamExt; |
| use nix::sys::stat; |
| use nix::unistd; |
| use tempfile::TempDir; |
| use tokio::task::JoinSet; |
| |
| use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; |
| use datafusion::datasource::TableProvider; |
| use datafusion::prelude::{SessionConfig, SessionContext}; |
| use datafusion_common::{exec_err, Result}; |
| use datafusion_expr::SortExpr; |
| |
| // Number of lines written to FIFO |
| const TEST_BATCH_SIZE: usize = 5; |
| const TEST_DATA_SIZE: usize = 5; |
| |
| /// Makes a TableProvider for a fifo file using `StreamTable` with the `StreamProvider` trait |
| fn fifo_table( |
| schema: SchemaRef, |
| path: impl Into<PathBuf>, |
| sort: Vec<Vec<SortExpr>>, |
| ) -> Arc<dyn TableProvider> { |
| let source = FileStreamProvider::new_file(schema, path.into()) |
| .with_batch_size(TEST_BATCH_SIZE) |
| .with_header(true); |
| let config = StreamConfig::new(Arc::new(source)).with_order(sort); |
| Arc::new(StreamTable::new(Arc::new(config))) |
| } |
| |
| fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> { |
| let file_path = tmp_dir.path().join(file_name); |
| // Simulate an infinite environment via a FIFO file |
| if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) { |
| exec_err!("{}", e) |
| } else { |
| Ok(file_path) |
| } |
| } |
| |
| fn write_to_fifo( |
| mut file: &File, |
| line: &str, |
| ref_time: Instant, |
| broken_pipe_timeout: Duration, |
| ) -> Result<()> { |
| // We need to handle broken pipe error until the reader is ready. This |
| // is why we use a timeout to limit the wait duration for the reader. |
| // If the error is different than broken pipe, we fail immediately. |
| while let Err(e) = file.write_all(line.as_bytes()) { |
| if e.raw_os_error().unwrap() == 32 { |
| let interval = Instant::now().duration_since(ref_time); |
| if interval < broken_pipe_timeout { |
| thread::sleep(Duration::from_millis(100)); |
| continue; |
| } |
| } |
| return exec_err!("{}", e); |
| } |
| Ok(()) |
| } |
| |
| fn create_writing_thread( |
| file_path: PathBuf, |
| maybe_header: Option<String>, |
| lines: Vec<String>, |
| waiting_lock: Arc<AtomicBool>, |
| wait_until: usize, |
| tasks: &mut JoinSet<()>, |
| ) { |
| // Timeout for a long period of BrokenPipe error |
| let broken_pipe_timeout = Duration::from_secs(10); |
| let sa = file_path; |
| // Spawn a new thread to write to the FIFO file |
| #[allow(clippy::disallowed_methods)] // spawn allowed only in tests |
| tasks.spawn_blocking(move || { |
| let file = OpenOptions::new().write(true).open(sa).unwrap(); |
| // Reference time to use when deciding to fail the test |
| let execution_start = Instant::now(); |
| if let Some(header) = maybe_header { |
| write_to_fifo(&file, &header, execution_start, broken_pipe_timeout) |
| .unwrap(); |
| } |
| for (cnt, line) in lines.iter().enumerate() { |
| while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until { |
| thread::sleep(Duration::from_millis(50)); |
| } |
| write_to_fifo(&file, line, execution_start, broken_pipe_timeout).unwrap(); |
| } |
| drop(file); |
| }); |
| } |
| |
| /// This example demonstrates a scanning against an Arrow data source (JSON) and |
| /// fetching results |
| pub async fn main() -> Result<()> { |
| // Create session context |
| let config = SessionConfig::new() |
| .with_batch_size(TEST_BATCH_SIZE) |
| .with_collect_statistics(false) |
| .with_target_partitions(1); |
| let ctx = SessionContext::new_with_config(config); |
| let tmp_dir = TempDir::new()?; |
| let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?; |
| |
| let mut tasks: JoinSet<()> = JoinSet::new(); |
| let waiting = Arc::new(AtomicBool::new(true)); |
| |
| let data_iter = 0..TEST_DATA_SIZE; |
| let lines = data_iter |
| .map(|i| format!("{},{}\n", i, i + 1)) |
| .collect::<Vec<_>>(); |
| |
| create_writing_thread( |
| fifo_path.clone(), |
| Some("a1,a2\n".to_owned()), |
| lines.clone(), |
| waiting.clone(), |
| TEST_DATA_SIZE, |
| &mut tasks, |
| ); |
| |
| // Create schema |
| let schema = Arc::new(Schema::new(vec![ |
| Field::new("a1", DataType::UInt32, false), |
| Field::new("a2", DataType::UInt32, false), |
| ])); |
| |
| // Specify the ordering: |
| let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]]; |
| |
| let provider = fifo_table(schema.clone(), fifo_path, order.clone()); |
| ctx.register_table("fifo", provider)?; |
| |
| let df = ctx.sql("SELECT * FROM fifo").await.unwrap(); |
| let mut stream = df.execute_stream().await.unwrap(); |
| |
| let mut batches = Vec::new(); |
| if let Some(Ok(batch)) = stream.next().await { |
| batches.push(batch) |
| } |
| |
| let expected = vec![ |
| "+----+----+", |
| "| a1 | a2 |", |
| "+----+----+", |
| "| 0 | 1 |", |
| "| 1 | 2 |", |
| "| 2 | 3 |", |
| "| 3 | 4 |", |
| "| 4 | 5 |", |
| "+----+----+", |
| ]; |
| |
| assert_batches_eq!(&expected, &batches); |
| |
| Ok(()) |
| } |
| } |
| |
| #[tokio::main] |
| async fn main() -> datafusion_common::Result<()> { |
| #[cfg(target_os = "windows")] |
| { |
| println!("file_stream_provider example does not work on windows"); |
| Ok(()) |
| } |
| #[cfg(not(target_os = "windows"))] |
| { |
| non_windows::main().await |
| } |
| } |