blob: 516929ebacf19ab3f57b832157461ceaf159d205 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use datafusion::{
dataframe::DataFrame,
error::DataFusionError,
execution::{context::SessionState, TaskContext},
logical_expr::LogicalPlan,
prelude::SessionContext,
};
use object_store::ObjectStore;
use crate::object_storage::{AwsOptions, GcpOptions};
#[async_trait::async_trait]
/// The CLI session context trait provides a way to have a session context that can be used with datafusion's CLI code.
pub trait CliSessionContext {
/// Get an atomic reference counted task context.
fn task_ctx(&self) -> Arc<TaskContext>;
/// Get the session state.
fn session_state(&self) -> SessionState;
/// Register an object store with the session context.
fn register_object_store(
&self,
url: &url::Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore + 'static>>;
/// Register table options extension from scheme.
fn register_table_options_extension_from_scheme(&self, scheme: &str);
/// Execute a logical plan and return a DataFrame.
async fn execute_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError>;
}
#[async_trait::async_trait]
impl CliSessionContext for SessionContext {
fn task_ctx(&self) -> Arc<TaskContext> {
self.task_ctx()
}
fn session_state(&self) -> SessionState {
self.state()
}
fn register_object_store(
&self,
url: &url::Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore + 'static>> {
self.register_object_store(url, object_store)
}
fn register_table_options_extension_from_scheme(&self, scheme: &str) {
match scheme {
// For Amazon S3 or Alibaba Cloud OSS
"s3" | "oss" | "cos" => {
// Register AWS specific table options in the session context:
self.register_table_options_extension(AwsOptions::default())
}
// For Google Cloud Storage
"gs" | "gcs" => {
// Register GCP specific table options in the session context:
self.register_table_options_extension(GcpOptions::default())
}
// For unsupported schemes, do nothing:
_ => {}
}
}
async fn execute_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError> {
self.execute_logical_plan(plan).await
}
}