blob: 45f545b1805d4328ecc2dc28dbde01b1965db8fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use arrow::pyarrow::ToPyArrow;
use std::collections::HashMap;
use std::convert::From;
use std::path::PathBuf;
use std::sync::OnceLock;
use tokio::runtime::Runtime;
#[cfg(feature = "datafusion")]
use datafusion::error::DataFusionError;
use hudi::error::CoreError;
use hudi::file_group::file_slice::FileSlice;
use hudi::file_group::reader::FileGroupReader;
use hudi::file_group::FileGroup;
use hudi::storage::error::StorageError;
use hudi::table::builder::TableBuilder;
use hudi::table::Table;
use hudi::timeline::instant::Instant;
use hudi::timeline::Timeline;
use pyo3::exceptions::PyException;
use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python};
use std::error::Error;
create_exception!(_internal, HudiCoreError, PyException);
fn convert_to_py_err<I>(err: I) -> PyErr
where
I: Error,
{
// TODO(xushiyan): match and map all sub types
HudiCoreError::new_err(err.to_string())
}
#[derive(thiserror::Error, Debug)]
pub enum PythonError {
#[error("Error in Hudi core: {0}")]
HudiCore(#[from] CoreError),
#[cfg(feature = "datafusion")]
#[error("Error in Datafusion core: {0}")]
DataFusionCore(#[from] DataFusionError),
}
impl From<PythonError> for PyErr {
fn from(err: PythonError) -> PyErr {
match err {
PythonError::HudiCore(err) => convert_to_py_err(err),
#[cfg(feature = "datafusion")]
PythonError::DataFusionCore(err) => convert_to_py_err(err),
}
}
}
#[cfg(not(tarpaulin_include))]
#[derive(Clone, Debug)]
#[pyclass]
pub struct HudiFileGroupReader {
inner: FileGroupReader,
}
#[cfg(not(tarpaulin_include))]
#[pymethods]
impl HudiFileGroupReader {
#[new]
#[pyo3(signature = (base_uri, options=None))]
fn new_with_options(
base_uri: &str,
options: Option<HashMap<String, String>>,
) -> PyResult<Self> {
let inner = FileGroupReader::new_with_options(base_uri, options.unwrap_or_default())
.map_err(PythonError::from)?;
Ok(HudiFileGroupReader { inner })
}
fn read_file_slice_by_base_file_path(
&self,
relative_path: &str,
py: Python,
) -> PyResult<PyObject> {
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))
.map_err(PythonError::from)?
.to_pyarrow(py)
}
fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) -> PyResult<PyObject> {
let mut file_group = FileGroup::new_with_base_file_name(
&file_slice.base_file_name,
&file_slice.partition_path,
)
.map_err(PythonError::from)?;
let log_file_names = &file_slice.log_file_names;
file_group
.add_log_files_from_names(log_file_names)
.map_err(PythonError::from)?;
let (_, file_slice) = file_group
.file_slices
.iter()
.next()
.ok_or_else(|| {
CoreError::FileGroup(format!(
"Failed to get file slice from file group: {:?}",
file_group
))
})
.map_err(PythonError::from)?;
rt().block_on(self.inner.read_file_slice(file_slice))
.map_err(PythonError::from)?
.to_pyarrow(py)
}
}
#[cfg(not(tarpaulin_include))]
#[derive(Clone, Debug)]
#[pyclass]
pub struct HudiFileSlice {
#[pyo3(get)]
file_id: String,
#[pyo3(get)]
partition_path: String,
#[pyo3(get)]
creation_instant_time: String,
#[pyo3(get)]
base_file_name: String,
#[pyo3(get)]
base_file_size: usize,
#[pyo3(get)]
base_file_byte_size: i64,
#[pyo3(get)]
log_file_names: Vec<String>,
#[pyo3(get)]
num_records: i64,
}
#[cfg(not(tarpaulin_include))]
#[pymethods]
impl HudiFileSlice {
fn base_file_relative_path(&self) -> PyResult<String> {
let path = PathBuf::from(&self.partition_path)
.join(&self.base_file_name)
.to_str()
.map(String::from)
.ok_or_else(|| {
StorageError::InvalidPath(format!(
"Failed to get base file relative path for file slice: {:?}",
self
))
})
.map_err(CoreError::from)
.map_err(PythonError::from)?;
Ok(path)
}
fn log_files_relative_paths(&self) -> PyResult<Vec<String>> {
let mut paths = Vec::<String>::new();
for name in self.log_file_names.iter() {
let p = PathBuf::from(&self.partition_path)
.join(name)
.to_str()
.map(String::from)
.ok_or_else(|| {
StorageError::InvalidPath(format!(
"Failed to get log file relative path for file slice: {:?}",
self
))
})
.map_err(CoreError::from)
.map_err(PythonError::from)?;
paths.push(p)
}
Ok(paths)
}
}
#[cfg(not(tarpaulin_include))]
impl From<&FileSlice> for HudiFileSlice {
fn from(f: &FileSlice) -> Self {
let file_id = f.file_id().to_string();
let partition_path = f.partition_path.to_string();
let creation_instant_time = f.creation_instant_time().to_string();
let base_file_name = f.base_file.file_name();
let file_metadata = f.base_file.file_metadata.clone().unwrap_or_default();
let base_file_size = file_metadata.size;
let base_file_byte_size = file_metadata.byte_size;
let log_file_names = f.log_files.iter().map(|l| l.file_name()).collect();
let num_records = file_metadata.num_records;
HudiFileSlice {
file_id,
partition_path,
creation_instant_time,
base_file_name,
base_file_size,
base_file_byte_size,
log_file_names,
num_records,
}
}
}
#[cfg(not(tarpaulin_include))]
#[derive(Clone, Debug)]
#[pyclass]
pub struct HudiInstant {
inner: Instant,
}
#[cfg(not(tarpaulin_include))]
#[pymethods]
impl HudiInstant {
#[getter]
pub fn timestamp(&self) -> String {
self.inner.timestamp.to_string()
}
#[getter]
pub fn action(&self) -> String {
self.inner.action.as_ref().to_string()
}
#[getter]
pub fn state(&self) -> String {
self.inner.state.as_ref().to_string()
}
#[getter]
pub fn epoch_mills(&self) -> i64 {
self.inner.epoch_millis
}
}
impl From<&Instant> for HudiInstant {
fn from(i: &Instant) -> Self {
HudiInstant {
inner: i.to_owned(),
}
}
}
#[cfg(not(tarpaulin_include))]
#[pyclass]
pub struct HudiTable {
inner: Table,
}
#[cfg(not(tarpaulin_include))]
#[pymethods]
impl HudiTable {
#[new]
#[pyo3(signature = (base_uri, options=None))]
fn new_with_options(
base_uri: &str,
options: Option<HashMap<String, String>>,
) -> PyResult<Self> {
let inner: Table = rt()
.block_on(Table::new_with_options(
base_uri,
options.unwrap_or_default(),
))
.map_err(PythonError::from)?;
Ok(HudiTable { inner })
}
fn hudi_options(&self) -> HashMap<String, String> {
self.inner.hudi_options()
}
fn storage_options(&self) -> HashMap<String, String> {
self.inner.storage_options()
}
#[getter]
fn table_name(&self) -> String {
self.inner.table_name()
}
#[getter]
fn table_type(&self) -> String {
self.inner.table_type()
}
#[getter]
fn is_mor(&self) -> bool {
self.inner.is_mor()
}
#[getter]
fn timezone(&self) -> String {
self.inner.timezone()
}
fn get_avro_schema(&self, py: Python) -> PyResult<String> {
py.allow_threads(|| {
let avro_schema = rt()
.block_on(self.inner.get_avro_schema())
.map_err(PythonError::from)?;
Ok(avro_schema)
})
}
fn get_schema(&self, py: Python) -> PyResult<PyObject> {
rt().block_on(self.inner.get_schema())
.map_err(PythonError::from)?
.to_pyarrow(py)
}
fn get_partition_schema(&self, py: Python) -> PyResult<PyObject> {
rt().block_on(self.inner.get_partition_schema())
.map_err(PythonError::from)?
.to_pyarrow(py)
}
fn get_timeline(&self, py: Python) -> HudiTimeline {
py.allow_threads(|| {
let timeline = self.inner.get_timeline();
HudiTimeline::from(timeline)
})
}
#[pyo3(signature = (num_splits, filters=None))]
fn get_file_slices_splits(
&self,
num_splits: usize,
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<Vec<HudiFileSlice>>> {
py.allow_threads(|| {
let file_slices = rt()
.block_on(
self.inner
.get_file_slices_splits(num_splits, filters.unwrap_or_default()),
)
.map_err(PythonError::from)?;
Ok(file_slices
.iter()
.map(|inner_vec| inner_vec.iter().map(HudiFileSlice::from).collect())
.collect())
})
}
#[pyo3(signature = (num_splits, timestamp, filters=None))]
fn get_file_slices_splits_as_of(
&self,
num_splits: usize,
timestamp: &str,
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<Vec<HudiFileSlice>>> {
py.allow_threads(|| {
let file_slices = rt()
.block_on(self.inner.get_file_slices_splits_as_of(
num_splits,
timestamp,
filters.unwrap_or_default(),
))
.map_err(PythonError::from)?;
Ok(file_slices
.iter()
.map(|inner_vec| inner_vec.iter().map(HudiFileSlice::from).collect())
.collect())
})
}
#[pyo3(signature = (filters=None))]
fn get_file_slices(
&self,
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
py.allow_threads(|| {
let file_slices = rt()
.block_on(self.inner.get_file_slices(filters.unwrap_or_default()))
.map_err(PythonError::from)?;
Ok(file_slices.iter().map(HudiFileSlice::from).collect())
})
}
#[pyo3(signature = (timestamp, filters=None))]
fn get_file_slices_as_of(
&self,
timestamp: &str,
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
py.allow_threads(|| {
let file_slices = rt()
.block_on(
self.inner
.get_file_slices_as_of(timestamp, filters.unwrap_or_default()),
)
.map_err(PythonError::from)?;
Ok(file_slices.iter().map(HudiFileSlice::from).collect())
})
}
#[pyo3(signature = (start_timestamp=None, end_timestamp=None))]
fn get_file_slices_between(
&self,
start_timestamp: Option<&str>,
end_timestamp: Option<&str>,
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
py.allow_threads(|| {
let file_slices = rt()
.block_on(
self.inner
.get_file_slices_between(start_timestamp, end_timestamp),
)
.map_err(PythonError::from)?;
Ok(file_slices.iter().map(HudiFileSlice::from).collect())
})
}
#[pyo3(signature = (options=None))]
fn create_file_group_reader_with_options(
&self,
options: Option<HashMap<String, String>>,
) -> PyResult<HudiFileGroupReader> {
let fg_reader = self
.inner
.create_file_group_reader_with_options(options.unwrap_or_default())
.map_err(PythonError::from)?;
Ok(HudiFileGroupReader { inner: fg_reader })
}
#[pyo3(signature = (filters=None))]
fn read_snapshot(
&self,
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<PyObject> {
rt().block_on(self.inner.read_snapshot(filters.unwrap_or_default()))
.map_err(PythonError::from)?
.to_pyarrow(py)
}
#[pyo3(signature = (timestamp, filters=None))]
fn read_snapshot_as_of(
&self,
timestamp: &str,
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<PyObject> {
rt().block_on(
self.inner
.read_snapshot_as_of(timestamp, filters.unwrap_or_default()),
)
.map_err(PythonError::from)?
.to_pyarrow(py)
}
#[pyo3(signature = (start_timestamp, end_timestamp=None))]
fn read_incremental_records(
&self,
start_timestamp: &str,
end_timestamp: Option<&str>,
py: Python,
) -> PyResult<PyObject> {
rt().block_on(
self.inner
.read_incremental_records(start_timestamp, end_timestamp),
)
.map_err(PythonError::from)?
.to_pyarrow(py)
}
}
#[cfg(not(tarpaulin_include))]
#[pyclass]
pub struct HudiTimeline {
inner: Timeline,
}
#[cfg(not(tarpaulin_include))]
#[pymethods]
impl HudiTimeline {
#[pyo3(signature = (desc=false))]
pub fn get_completed_commits(&self, desc: bool, py: Python) -> PyResult<Vec<HudiInstant>> {
py.allow_threads(|| {
let instants = rt()
.block_on(self.inner.get_completed_commits(desc))
.map_err(PythonError::from)?;
Ok(instants.iter().map(HudiInstant::from).collect())
})
}
#[pyo3(signature = (desc=false))]
pub fn get_completed_deltacommits(&self, desc: bool, py: Python) -> PyResult<Vec<HudiInstant>> {
py.allow_threads(|| {
let instants = rt()
.block_on(self.inner.get_completed_deltacommits(desc))
.map_err(PythonError::from)?;
Ok(instants.iter().map(HudiInstant::from).collect())
})
}
#[pyo3(signature = (desc=false))]
pub fn get_completed_replacecommits(
&self,
desc: bool,
py: Python,
) -> PyResult<Vec<HudiInstant>> {
py.allow_threads(|| {
let instants = rt()
.block_on(self.inner.get_completed_replacecommits(desc))
.map_err(PythonError::from)?;
Ok(instants.iter().map(HudiInstant::from).collect())
})
}
#[pyo3(signature = (desc=false))]
pub fn get_completed_clustering_commits(
&self,
desc: bool,
py: Python,
) -> PyResult<Vec<HudiInstant>> {
py.allow_threads(|| {
let instants = rt()
.block_on(self.inner.get_completed_clustering_commits(desc))
.map_err(PythonError::from)?;
Ok(instants.iter().map(HudiInstant::from).collect())
})
}
pub fn get_instant_metadata_in_json(
&self,
instant: &HudiInstant,
py: Python,
) -> PyResult<String> {
py.allow_threads(|| {
let commit_metadata = rt()
.block_on(self.inner.get_instant_metadata_in_json(&instant.inner))
.map_err(PythonError::from)?;
Ok(commit_metadata)
})
}
pub fn get_latest_commit_timestamp(&self, py: Python) -> PyResult<String> {
py.allow_threads(|| {
let commit_timestamp = self
.inner
.get_latest_commit_timestamp()
.map_err(PythonError::from)?;
Ok(commit_timestamp)
})
}
pub fn get_latest_avro_schema(&self, py: Python) -> PyResult<String> {
py.allow_threads(|| {
let schema = rt()
.block_on(self.inner.get_latest_avro_schema())
.map_err(PythonError::from)?;
Ok(schema)
})
}
pub fn get_latest_schema(&self, py: Python) -> PyResult<PyObject> {
rt().block_on(self.inner.get_latest_schema())
.map_err(PythonError::from)?
.to_pyarrow(py)
}
}
impl From<&Timeline> for HudiTimeline {
fn from(t: &Timeline) -> Self {
HudiTimeline {
inner: t.to_owned(),
}
}
}
#[cfg(not(tarpaulin_include))]
#[pyfunction]
#[pyo3(signature = (base_uri, hudi_options=None, storage_options=None, options=None))]
pub fn build_hudi_table(
base_uri: String,
hudi_options: Option<HashMap<String, String>>,
storage_options: Option<HashMap<String, String>>,
options: Option<HashMap<String, String>>,
) -> PyResult<HudiTable> {
let inner = rt()
.block_on(
TableBuilder::from_base_uri(&base_uri)
.with_hudi_options(hudi_options.unwrap_or_default())
.with_storage_options(storage_options.unwrap_or_default())
.with_options(options.unwrap_or_default())
.build(),
)
.map_err(PythonError::from)?;
Ok(HudiTable { inner })
}
#[cfg(not(tarpaulin_include))]
pub fn rt() -> &'static Runtime {
static TOKIO_RT: OnceLock<Runtime> = OnceLock::new();
TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime."))
}