| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Execution plan for reading line-delimited JSON files |
| |
| use std::any::Any; |
| use std::io::{BufReader, Read, Seek, SeekFrom}; |
| use std::sync::Arc; |
| use std::task::Poll; |
| |
| use crate::file_format::JsonDecoder; |
| |
| use datafusion_common::error::{DataFusionError, Result}; |
| use datafusion_common_runtime::JoinSet; |
| use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer}; |
| use datafusion_datasource::file_compression_type::FileCompressionType; |
| use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; |
| use datafusion_datasource::schema_adapter::SchemaAdapterFactory; |
| use datafusion_datasource::{ |
| as_file_source, calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation, |
| TableSchema, |
| }; |
| use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; |
| |
| use arrow::json::ReaderBuilder; |
| use arrow::{datatypes::SchemaRef, json}; |
| use datafusion_common::Statistics; |
| use datafusion_datasource::file::FileSource; |
| use datafusion_datasource::file_scan_config::FileScanConfig; |
| use datafusion_execution::TaskContext; |
| use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; |
| |
| use futures::{StreamExt, TryStreamExt}; |
| use object_store::buffered::BufWriter; |
| use object_store::{GetOptions, GetResultPayload, ObjectStore}; |
| use tokio::io::AsyncWriteExt; |
| |
| /// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`] |
| pub struct JsonOpener { |
| batch_size: usize, |
| projected_schema: SchemaRef, |
| file_compression_type: FileCompressionType, |
| object_store: Arc<dyn ObjectStore>, |
| } |
| |
| impl JsonOpener { |
| /// Returns a [`JsonOpener`] |
| pub fn new( |
| batch_size: usize, |
| projected_schema: SchemaRef, |
| file_compression_type: FileCompressionType, |
| object_store: Arc<dyn ObjectStore>, |
| ) -> Self { |
| Self { |
| batch_size, |
| projected_schema, |
| file_compression_type, |
| object_store, |
| } |
| } |
| } |
| |
| /// JsonSource holds the extra configuration that is necessary for [`JsonOpener`] |
| #[derive(Clone, Default)] |
| pub struct JsonSource { |
| batch_size: Option<usize>, |
| metrics: ExecutionPlanMetricsSet, |
| projected_statistics: Option<Statistics>, |
| schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, |
| } |
| |
| impl JsonSource { |
| /// Initialize a JsonSource with default values |
| pub fn new() -> Self { |
| Self::default() |
| } |
| } |
| |
| impl From<JsonSource> for Arc<dyn FileSource> { |
| fn from(source: JsonSource) -> Self { |
| as_file_source(source) |
| } |
| } |
| |
| impl FileSource for JsonSource { |
| fn create_file_opener( |
| &self, |
| object_store: Arc<dyn ObjectStore>, |
| base_config: &FileScanConfig, |
| _partition: usize, |
| ) -> Arc<dyn FileOpener> { |
| Arc::new(JsonOpener { |
| batch_size: self |
| .batch_size |
| .expect("Batch size must set before creating opener"), |
| projected_schema: base_config.projected_file_schema(), |
| file_compression_type: base_config.file_compression_type, |
| object_store, |
| }) |
| } |
| |
| fn as_any(&self) -> &dyn Any { |
| self |
| } |
| |
| fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> { |
| let mut conf = self.clone(); |
| conf.batch_size = Some(batch_size); |
| Arc::new(conf) |
| } |
| |
| fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> { |
| Arc::new(Self { ..self.clone() }) |
| } |
| fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { |
| let mut conf = self.clone(); |
| conf.projected_statistics = Some(statistics); |
| Arc::new(conf) |
| } |
| |
| fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> { |
| Arc::new(Self { ..self.clone() }) |
| } |
| |
| fn metrics(&self) -> &ExecutionPlanMetricsSet { |
| &self.metrics |
| } |
| |
| fn statistics(&self) -> Result<Statistics> { |
| let statistics = &self.projected_statistics; |
| Ok(statistics |
| .clone() |
| .expect("projected_statistics must be set to call")) |
| } |
| |
| fn file_type(&self) -> &str { |
| "json" |
| } |
| |
| fn with_schema_adapter_factory( |
| &self, |
| schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, |
| ) -> Result<Arc<dyn FileSource>> { |
| Ok(Arc::new(Self { |
| schema_adapter_factory: Some(schema_adapter_factory), |
| ..self.clone() |
| })) |
| } |
| |
| fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { |
| self.schema_adapter_factory.clone() |
| } |
| } |
| |
| impl FileOpener for JsonOpener { |
| /// Open a partitioned NDJSON file. |
| /// |
| /// If `file_meta.range` is `None`, the entire file is opened. |
| /// Else `file_meta.range` is `Some(FileRange{start, end})`, which corresponds to the byte range [start, end) within the file. |
| /// |
| /// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules |
| /// are applied to determine which lines to read: |
| /// 1. The first line of the partition is the line in which the index of the first character >= `start`. |
| /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides. |
| fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> { |
| let store = Arc::clone(&self.object_store); |
| let schema = Arc::clone(&self.projected_schema); |
| let batch_size = self.batch_size; |
| let file_compression_type = self.file_compression_type.to_owned(); |
| |
| Ok(Box::pin(async move { |
| let calculated_range = |
| calculate_range(&partitioned_file, &store, None).await?; |
| |
| let range = match calculated_range { |
| RangeCalculation::Range(None) => None, |
| RangeCalculation::Range(Some(range)) => Some(range.into()), |
| RangeCalculation::TerminateEarly => { |
| return Ok( |
| futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed() |
| ) |
| } |
| }; |
| |
| let options = GetOptions { |
| range, |
| ..Default::default() |
| }; |
| |
| let result = store |
| .get_opts(&partitioned_file.object_meta.location, options) |
| .await?; |
| |
| match result.payload { |
| #[cfg(not(target_arch = "wasm32"))] |
| GetResultPayload::File(mut file, _) => { |
| let bytes = match partitioned_file.range { |
| None => file_compression_type.convert_read(file)?, |
| Some(_) => { |
| file.seek(SeekFrom::Start(result.range.start as _))?; |
| let limit = result.range.end - result.range.start; |
| file_compression_type.convert_read(file.take(limit as u64))? |
| } |
| }; |
| |
| let reader = ReaderBuilder::new(schema) |
| .with_batch_size(batch_size) |
| .build(BufReader::new(bytes))?; |
| |
| Ok(futures::stream::iter(reader) |
| .map(|r| r.map_err(Into::into)) |
| .boxed()) |
| } |
| GetResultPayload::Stream(s) => { |
| let s = s.map_err(DataFusionError::from); |
| |
| let decoder = ReaderBuilder::new(schema) |
| .with_batch_size(batch_size) |
| .build_decoder()?; |
| let input = file_compression_type.convert_stream(s.boxed())?.fuse(); |
| |
| let stream = deserialize_stream( |
| input, |
| DecoderDeserializer::new(JsonDecoder::new(decoder)), |
| ); |
| Ok(stream.map_err(Into::into).boxed()) |
| } |
| } |
| })) |
| } |
| } |
| |
| pub async fn plan_to_json( |
| task_ctx: Arc<TaskContext>, |
| plan: Arc<dyn ExecutionPlan>, |
| path: impl AsRef<str>, |
| ) -> Result<()> { |
| let path = path.as_ref(); |
| let parsed = ListingTableUrl::parse(path)?; |
| let object_store_url = parsed.object_store(); |
| let store = task_ctx.runtime_env().object_store(&object_store_url)?; |
| let writer_buffer_size = task_ctx |
| .session_config() |
| .options() |
| .execution |
| .objectstore_writer_buffer_size; |
| let mut join_set = JoinSet::new(); |
| for i in 0..plan.output_partitioning().partition_count() { |
| let storeref = Arc::clone(&store); |
| let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan); |
| let filename = format!("{}/part-{i}.json", parsed.prefix()); |
| let file = object_store::path::Path::parse(filename)?; |
| |
| let mut stream = plan.execute(i, Arc::clone(&task_ctx))?; |
| join_set.spawn(async move { |
| let mut buf_writer = |
| BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size); |
| |
| let mut buffer = Vec::with_capacity(1024); |
| while let Some(batch) = stream.next().await.transpose()? { |
| let mut writer = json::LineDelimitedWriter::new(buffer); |
| writer.write(&batch)?; |
| buffer = writer.into_inner(); |
| buf_writer.write_all(&buffer).await?; |
| buffer.clear(); |
| } |
| |
| buf_writer.shutdown().await.map_err(DataFusionError::from) |
| }); |
| } |
| |
| while let Some(result) = join_set.join_next().await { |
| match result { |
| Ok(res) => res?, // propagate DataFusion error |
| Err(e) => { |
| if e.is_panic() { |
| std::panic::resume_unwind(e.into_panic()); |
| } else { |
| unreachable!(); |
| } |
| } |
| } |
| } |
| |
| Ok(()) |
| } |