blob: 135f8ecec373d89054151fadaab7e4bf22d56f92 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use adbc_core::PartitionedResult;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::Schema;
use sedona::{context::SedonaContext, reader::SedonaStreamReader};
use std::sync::Arc;
use tokio::runtime::Runtime;
use adbc_core::{
error::{Error, Result, Status},
options::{OptionStatement, OptionValue},
Optionable, Statement,
};
use crate::{err_not_implemented, err_unrecognized_option, utils::from_datafusion_error};
pub struct SedonaStatement {
runtime: Arc<Runtime>,
ctx: Arc<SedonaContext>,
sql_query: Option<String>,
}
impl SedonaStatement {
pub(crate) fn new(runtime: Arc<Runtime>, ctx: Arc<SedonaContext>) -> SedonaStatement {
Self {
runtime,
ctx,
sql_query: None,
}
}
}
impl Optionable for SedonaStatement {
type Option = OptionStatement;
fn set_option(&mut self, key: Self::Option, _value: OptionValue) -> Result<()> {
err_unrecognized_option!(key)
}
fn get_option_string(&self, key: Self::Option) -> Result<String> {
err_unrecognized_option!(key)
}
fn get_option_bytes(&self, key: Self::Option) -> Result<Vec<u8>> {
err_unrecognized_option!(key)
}
fn get_option_int(&self, key: Self::Option) -> Result<i64> {
err_unrecognized_option!(key)
}
fn get_option_double(&self, key: Self::Option) -> Result<f64> {
err_unrecognized_option!(key)
}
}
impl Statement for SedonaStatement {
fn set_sql_query(&mut self, query: impl AsRef<str>) -> Result<()> {
self.sql_query = Some(query.as_ref().to_string());
Ok(())
}
fn prepare(&mut self) -> Result<()> {
Ok(())
}
fn execute_schema(&mut self) -> Result<Schema> {
if let Some(query) = self.sql_query.clone() {
self.runtime.block_on(async {
let df = self.ctx.sql(&query).await.map_err(from_datafusion_error)?;
Ok(df.schema().as_arrow().clone())
})
} else {
Err(Error::with_message_and_status(
"query not set yet",
Status::InvalidState,
))
}
}
fn execute(&mut self) -> Result<impl RecordBatchReader + Send> {
if let Some(query) = self.sql_query.clone() {
self.runtime.block_on(async {
let df = self.ctx.sql(&query).await.map_err(from_datafusion_error)?;
let stream = df.execute_stream().await.map_err(from_datafusion_error)?;
Ok(SedonaStreamReader::new(self.runtime.clone(), stream))
})
} else {
Err(Error::with_message_and_status(
"query not set yet",
Status::InvalidState,
))
}
}
fn execute_update(&mut self) -> Result<Option<i64>> {
err_not_implemented!()
}
fn execute_partitions(&mut self) -> Result<PartitionedResult> {
err_not_implemented!()
}
fn bind(&mut self, _batch: RecordBatch) -> Result<()> {
err_not_implemented!()
}
fn bind_stream(&mut self, _reader: Box<dyn RecordBatchReader + Send>) -> Result<()> {
err_not_implemented!()
}
fn get_parameter_schema(&self) -> Result<Schema> {
err_not_implemented!()
}
fn set_substrait_plan(&mut self, _plan: impl AsRef<[u8]>) -> Result<()> {
err_not_implemented!()
}
fn cancel(&mut self) -> Result<()> {
err_not_implemented!()
}
}
#[cfg(test)]
mod test {
use std::ops::Deref;
use adbc_core::{Connection, Database, Driver, Statement};
use arrow_array::RecordBatch;
use arrow_schema::{Field, Schema};
use datafusion::assert_batches_eq;
use crate::driver::SedonaDriver;
#[test]
fn statement() {
let mut statement = SedonaDriver::default()
.new_database()
.unwrap()
.new_connection()
.unwrap()
.new_statement()
.unwrap();
// Can't execute_schema() or execute() before setting a query
let maybe_err = statement.execute_schema();
assert_eq!(maybe_err.err().unwrap().message, "query not set yet");
let maybe_err = statement.execute();
assert_eq!(maybe_err.err().unwrap().message, "query not set yet");
statement
.set_sql_query("SELECT ST_AsText(ST_Point(1, 2)) AS geom")
.unwrap();
statement.prepare().unwrap();
assert_eq!(
statement.execute_schema().unwrap(),
Schema::new(vec![Field::new("geom", arrow_schema::DataType::Utf8, true)])
);
let batches: Result<Vec<RecordBatch>, _> = statement.execute().unwrap().collect();
assert_batches_eq!(
[
"+------------+",
"| geom |",
"+------------+",
"| POINT(1 2) |",
"+------------+",
],
batches.unwrap().deref()
);
}
}