blob: fc71473445939cfe2cab00eacb12e1b3efca5bd7 [file] [log] [blame]
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io::{BufReader, Read};
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::datafusion_data_access::object_store::{
FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
};
use datafusion::datafusion_data_access::Result;
use datafusion::datafusion_data_access::SizedFile;
use futures::AsyncRead;
use jni::objects::JObject;
use jni::sys::jint;
use crate::jni_bridge::JavaClasses;
use crate::jni_bridge_call_static_method;
use crate::jni_bridge_new_object;
use crate::{jni_bridge_call_method, jni_map_error};
#[derive(Clone)]
pub struct HDFSSingleFileObjectStore;
impl Debug for HDFSSingleFileObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "HDFSObjectStore")
}
}
#[async_trait::async_trait]
impl ObjectStore for HDFSSingleFileObjectStore {
async fn list_file(&self, _prefix: &str) -> Result<FileMetaStream> {
unreachable!()
}
async fn list_dir(
&self,
_prefix: &str,
_delimiter: Option<String>,
) -> Result<ListEntryStream> {
unreachable!()
}
fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
log::debug!("HDFSSingleFileStore.file_reader: {:?}", file);
let hdfs_input_stream = Arc::new(
FSDataInputStreamWrapper::try_new(&file.path)
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?,
);
Ok(Arc::new(HDFSObjectReader {
file,
hdfs_input_stream,
}))
}
}
#[derive(Clone)]
struct HDFSObjectReader {
file: SizedFile,
hdfs_input_stream: Arc<FSDataInputStreamWrapper>,
}
#[async_trait]
impl ObjectReader for HDFSObjectReader {
async fn chunk_reader(
&self,
_start: u64,
_length: usize,
) -> Result<Box<dyn AsyncRead>> {
unimplemented!()
}
fn sync_chunk_reader(
&self,
start: u64,
_: usize,
) -> Result<Box<dyn Read + Send + Sync>> {
self.get_reader(start)
}
fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> {
self.sync_chunk_reader(0, 0)
}
fn length(&self) -> u64 {
self.file.size
}
}
impl HDFSObjectReader {
fn get_reader(&self, start: u64) -> Result<Box<dyn Read + Send + Sync>> {
let reader = BufReader::new(HDFSFileReader {
hdfs_input_stream: self.hdfs_input_stream.clone(),
pos: start,
});
Ok(Box::new(reader))
}
}
#[derive(Clone)]
struct HDFSFileReader {
pub hdfs_input_stream: Arc<FSDataInputStreamWrapper>,
pub pos: u64,
}
impl Read for HDFSFileReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
(|| {
let env = JavaClasses::get_thread_jnienv();
log::debug!("HDFSFileReader.read: size={}", buf.len());
let buf = jni_map_error!(env.new_direct_byte_buffer(buf))?;
let read_size = jni_bridge_call_static_method!(
env,
JniBridge.readFSDataInputStream -> jint,
self.hdfs_input_stream.inner,
buf,
self.pos as i64,
)? as usize;
log::debug!("HDFSFileReader.read result: read_size={}", read_size);
self.pos += read_size as u64;
Ok(read_size)
})()
.map_err(|err: Box<dyn std::error::Error>| {
std::io::Error::new(std::io::ErrorKind::Other, format!("{:?}", err))
})
}
}
struct FSDataInputStreamWrapper {
pub inner: JObject<'static>,
env: jni::JNIEnv<'static>,
}
unsafe impl Send for FSDataInputStreamWrapper {}
unsafe impl Sync for FSDataInputStreamWrapper {}
impl FSDataInputStreamWrapper {
pub fn try_new(path: &str) -> Result<FSDataInputStreamWrapper> {
(|| {
let env = JavaClasses::get_thread_jnienv();
let fs = jni_bridge_call_static_method!(env, JniBridge.getHDFSFileSystem -> JObject)?;
let path = jni_bridge_new_object!(env, HadoopPath, jni_map_error!(env.new_string(path))?)?;
let hdfs_input_stream =
jni_bridge_call_method!(env, HadoopFileSystem.open -> JObject, fs, path)?;
Ok(FSDataInputStreamWrapper {
inner: hdfs_input_stream,
env,
})
})().map_err(|err: Box<dyn std::error::Error>| {
std::io::Error::new(std::io::ErrorKind::Other, format!("{:?}", err))
})
}
}
impl Drop for FSDataInputStreamWrapper {
fn drop(&mut self) {
// never panic in drop, otherwise the jvm process will be aborted
let _ = jni_bridge_call_method!(
self.env,
HadoopFSDataInputStream.close -> (),
self.inner
);
}
}