blob: b5e1bf952e3e503dbc663406e6f558a301adcf48 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::collections::HashMap;
use std::ffi::CString;
use std::sync::Arc;
use datafusion_ffi::table_provider::FFI_TableProvider;
use iceberg::TableIdent;
use iceberg::io::FileIO;
use iceberg::table::StaticTable;
use iceberg_datafusion::table::IcebergTableProvider;
use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
use crate::runtime::runtime;
#[pyclass(name = "IcebergDataFusionTable")]
pub struct PyIcebergDataFusionTable {
inner: Arc<IcebergTableProvider>,
}
#[pymethods]
impl PyIcebergDataFusionTable {
#[new]
fn new(
identifier: Vec<String>,
metadata_location: String,
file_io_properties: Option<HashMap<String, String>>,
) -> PyResult<Self> {
let runtime = runtime();
let provider = runtime.block_on(async {
let table_ident = TableIdent::from_strs(identifier)
.map_err(|e| PyRuntimeError::new_err(format!("Invalid table identifier: {e}")))?;
let mut builder = FileIO::from_path(&metadata_location)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to init FileIO: {e}")))?;
if let Some(props) = file_io_properties {
builder = builder.with_props(props);
}
let file_io = builder
.build()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to build FileIO: {e}")))?;
let static_table =
StaticTable::from_metadata_file(&metadata_location, table_ident, file_io)
.await
.map_err(|e| {
PyRuntimeError::new_err(format!("Failed to load static table: {e}"))
})?;
let table = static_table.into_table();
IcebergTableProvider::try_new_from_table(table)
.await
.map_err(|e| {
PyRuntimeError::new_err(format!("Failed to create table provider: {e}"))
})
})?;
Ok(Self {
inner: Arc::new(provider),
})
}
fn __datafusion_table_provider__<'py>(
&self,
py: Python<'py>,
) -> PyResult<Bound<'py, PyCapsule>> {
let capsule_name = CString::new("datafusion_table_provider").unwrap();
let ffi_provider = FFI_TableProvider::new(self.inner.clone(), false, Some(runtime()));
PyCapsule::new(py, ffi_provider, Some(capsule_name))
}
}
pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
let this = PyModule::new(py, "datafusion")?;
this.add_class::<PyIcebergDataFusionTable>()?;
m.add_submodule(&this)?;
py.import("sys")?
.getattr("modules")?
.set_item("pyiceberg_core.datafusion", this)?;
Ok(())
}