blob: 0be5eba30c35eea520abd2047bacf941cec1bd60 [file] [log] [blame]
// Copyright 2022 The Blaze Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
any::Any,
fs::{File, OpenOptions},
io::{BufReader, BufWriter, Cursor, Read, Seek, Write},
sync::Arc,
time::Duration,
};
use blaze_jni_bridge::{
is_jni_bridge_inited, jni_bridge::LocalRef, jni_call, jni_call_static, jni_get_string,
jni_new_direct_byte_buffer, jni_new_global_ref,
};
use datafusion::{common::Result, parquet::file::reader::Length, physical_plan::metrics::Time};
use jni::{objects::GlobalRef, sys::jlong};
use crate::memmgr::metrics::SpillMetrics;
pub type SpillCompressedReader<'a> =
lz4_flex::frame::FrameDecoder<BufReader<Box<dyn Read + Send + 'a>>>;
pub type SpillCompressedWriter<'a> =
lz4_flex::frame::AutoFinishEncoder<BufWriter<Box<dyn Write + Send + 'a>>>;
pub trait Spill: Send + Sync {
fn as_any(&self) -> &dyn Any;
fn get_buf_reader<'a>(&'a self) -> BufReader<Box<dyn Read + Send + 'a>>;
fn get_buf_writer<'a>(&'a mut self) -> BufWriter<Box<dyn Write + Send + 'a>>;
fn get_compressed_reader(&self) -> SpillCompressedReader<'_> {
lz4_flex::frame::FrameDecoder::new(self.get_buf_reader())
}
fn get_compressed_writer(&mut self) -> SpillCompressedWriter<'_> {
lz4_flex::frame::FrameEncoder::new(self.get_buf_writer()).auto_finish()
}
}
impl Spill for Vec<u8> {
fn as_any(&self) -> &dyn Any {
self
}
fn get_buf_reader<'a>(&'a self) -> BufReader<Box<dyn Read + Send + 'a>> {
BufReader::new(Box::new(Cursor::new(self)))
}
fn get_buf_writer<'a>(&'a mut self) -> BufWriter<Box<dyn Write + Send + 'a>> {
BufWriter::new(Box::new(self))
}
}
pub fn try_new_spill(spill_metrics: &SpillMetrics) -> Result<Box<dyn Spill>> {
if !is_jni_bridge_inited() || jni_call_static!(JniBridge.isDriverSide() -> bool)? {
Ok(Box::new(FileSpill::try_new(spill_metrics)?))
} else {
// use on heap spill if on-heap memory is available, otherwise use file spill
let hsm = jni_call_static!(JniBridge.getTaskOnHeapSpillManager() -> JObject)?;
if jni_call!(BlazeOnHeapSpillManager(hsm.as_obj()).isOnHeapAvailable() -> bool)? {
Ok(Box::new(OnHeapSpill::try_new(hsm, spill_metrics)?))
} else {
Ok(Box::new(FileSpill::try_new(spill_metrics)?))
}
}
}
/// A spill structure which write data to temporary files
/// used in driver side or executor side with on-heap memory is full
struct FileSpill(File, SpillMetrics);
impl FileSpill {
fn try_new(spill_metrics: &SpillMetrics) -> Result<Self> {
if is_jni_bridge_inited() {
let file_name = jni_get_string!(
jni_call_static!(JniBridge.getDirectWriteSpillToDiskFile() -> JObject)?
.as_obj()
.into()
)?;
let file = OpenOptions::new() // create file and open under rw mode
.create(true)
.truncate(true)
.write(true)
.read(true)
.open(&file_name)?;
Ok(Self(file, spill_metrics.clone()))
} else {
let file = tempfile::tempfile()?;
Ok(Self(file, spill_metrics.clone()))
}
}
}
impl Spill for FileSpill {
fn as_any(&self) -> &dyn Any {
self
}
fn get_buf_reader<'a>(&'a self) -> BufReader<Box<dyn Read + Send + 'a>> {
let mut file_cloned = self.0.try_clone().expect("File.try_clone() returns error");
file_cloned.sync_data().expect("error synchronizing data");
file_cloned.rewind().expect("error rewinding");
BufReader::with_capacity(
65536,
Box::new(IoTimeReadWrapper(
file_cloned,
self.1.mem_spill_iotime.clone(),
)),
)
}
fn get_buf_writer<'a>(&'a mut self) -> BufWriter<Box<dyn Write + Send + 'a>> {
let file_cloned = self.0.try_clone().expect("File.try_clone() returns error");
BufWriter::with_capacity(
65536,
Box::new(IoTimeWriteWrapper(
file_cloned,
self.1.mem_spill_iotime.clone(),
)),
)
}
}
impl Drop for FileSpill {
fn drop(&mut self) {
self.1.disk_spill_size.add(self.0.len() as usize);
self.1
.disk_spill_iotime
.add_duration(Duration::from_nanos(self.1.mem_spill_iotime.value() as u64))
}
}
/// A spill structure which cooperates with BlazeOnHeapSpillManager
/// used in executor side
struct OnHeapSpill(Arc<RawOnHeapSpill>, SpillMetrics);
impl OnHeapSpill {
fn try_new(hsm: LocalRef, spill_metrics: &SpillMetrics) -> Result<Self> {
let spill_id = jni_call!(BlazeOnHeapSpillManager(hsm.as_obj()).newSpill() -> i32)?;
Ok(Self(
Arc::new(RawOnHeapSpill {
hsm: jni_new_global_ref!(hsm.as_obj())?,
spill_id,
}),
spill_metrics.clone(),
))
}
fn get_disk_usage(&self) -> Result<u64> {
let usage = jni_call!(BlazeOnHeapSpillManager(self.0.hsm.as_obj())
.getSpillDiskUsage(self.0.spill_id) -> jlong)? as u64;
Ok(usage)
}
fn get_disk_iotime(&self) -> Result<u64> {
let iotime = jni_call!(BlazeOnHeapSpillManager(self.0.hsm.as_obj())
.getSpillDiskIOTime(self.0.spill_id) -> jlong)? as u64;
Ok(iotime)
}
}
impl Spill for OnHeapSpill {
fn as_any(&self) -> &dyn Any {
self
}
fn get_buf_reader<'a>(&'a self) -> BufReader<Box<dyn Read + Send + 'a>> {
let cloned = Self(self.0.clone(), self.1.clone());
BufReader::with_capacity(65536, Box::new(cloned))
}
fn get_buf_writer<'a>(&'a mut self) -> BufWriter<Box<dyn Write + Send + 'a>> {
let cloned = Self(self.0.clone(), self.1.clone());
BufWriter::with_capacity(1048576, Box::new(cloned))
}
}
impl Write for OnHeapSpill {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let _timer = self.1.mem_spill_iotime.timer();
let write_len = buf.len();
let buf = jni_new_direct_byte_buffer!(buf)?;
jni_call!(BlazeOnHeapSpillManager(
self.0.hsm.as_obj()).writeSpill(self.0.spill_id, buf.as_obj()) -> ()
)?;
self.1.mem_spill_size.add(write_len);
Ok(write_len)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl Read for OnHeapSpill {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let _timer = self.1.mem_spill_iotime.timer();
let buf = jni_new_direct_byte_buffer!(buf)?;
let read_len = jni_call!(BlazeOnHeapSpillManager(
self.0.hsm.as_obj()).readSpill(self.0.spill_id, buf.as_obj()) -> i32
)?;
Ok(read_len as usize)
}
}
impl Drop for OnHeapSpill {
fn drop(&mut self) {
self.1.mem_spill_count.add(1);
self.1
.disk_spill_size
.add(self.get_disk_usage().unwrap_or(0) as usize);
self.1
.disk_spill_iotime
.add_duration(Duration::from_nanos(self.get_disk_iotime().unwrap_or(0)));
}
}
struct RawOnHeapSpill {
hsm: GlobalRef,
spill_id: i32,
}
impl Drop for RawOnHeapSpill {
fn drop(&mut self) {
let _ = jni_call!(BlazeOnHeapSpillManager(self.hsm.as_obj())
.releaseSpill(self.spill_id) -> ());
}
}
struct IoTimeReadWrapper<R: Read>(R, Time);
struct IoTimeWriteWrapper<W: Write>(W, Time);
impl<R: Read> Read for IoTimeReadWrapper<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let _timer = self.1.timer();
self.0.read(buf)
}
}
impl<W: Write> Write for IoTimeWriteWrapper<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let _timer = self.1.timer();
self.0.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
let _timer = self.1.timer();
self.0.flush()
}
}