blob: 5033f0369e5344c6e22244a7a587a17ca5a33d83 [file] [log] [blame]
// Copyright 2022 The Blaze Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Defines the External shuffle repartition plan
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fs::File;
use std::fs::OpenOptions;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::io::{Cursor, Read};
use std::path::Path;
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::arrow::array::*;
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::datatypes::TimeUnit;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::execution::memory_manager::ConsumerType;
use datafusion::execution::memory_manager::MemoryConsumer;
use datafusion::execution::memory_manager::MemoryConsumerId;
use datafusion::execution::memory_manager::MemoryManager;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::common::batch_byte_size;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::DisplayFormatType;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::Statistics;
use futures::lock::Mutex;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use tempfile::NamedTempFile;
use tokio::task;
use crate::spark_hash::{create_hashes, pmod};
use crate::util::array_builder::{make_batch, new_array_builders};
use crate::util::ipc::write_ipc_compressed;
struct PartitionBuffer {
schema: SchemaRef,
frozen: Vec<u8>,
staging: Vec<RecordBatch>,
active: Vec<Box<dyn ArrayBuilder>>,
active_slots_mem_size: usize,
num_active_rows: usize,
num_staging_rows: usize,
batch_size: usize,
staging_size: usize,
}
impl PartitionBuffer {
fn new(schema: SchemaRef, batch_size: usize) -> Self {
let staging_size = batch_size / (batch_size as f64 + 1.0).log2() as usize;
Self {
schema,
frozen: vec![],
staging: vec![],
active: vec![],
active_slots_mem_size: 0,
num_active_rows: 0,
num_staging_rows: 0,
batch_size,
staging_size,
}
}
fn append_rows(&mut self, columns: &[ArrayRef], indices: &[usize]) -> Result<isize> {
let mut mem_diff = 0;
let mut start = 0;
// lazy init because some partition may be empty
if self.active.is_empty() {
self.active = new_array_builders(&self.schema, self.staging_size);
if self.active_slots_mem_size == 0 {
self.active_slots_mem_size = self
.active
.iter()
.zip(self.schema.fields())
.map(|(_ab, field)| slot_size(self.staging_size, field.data_type()))
.sum::<usize>();
}
mem_diff += self.active_slots_mem_size as isize;
}
while start < indices.len() {
let end = (start + self.batch_size).min(indices.len());
self.active
.iter_mut()
.zip(columns)
.for_each(|(builder, column)| {
append_columns(
builder,
column,
&indices[start..end],
column.data_type(),
);
});
self.num_active_rows += indices.len();
if self.num_active_rows >= self.staging_size {
mem_diff += self.flush_to_staging()?;
}
start = end;
}
Ok(mem_diff)
}
/// append a whole batch directly to staging
/// this will break the appending order when mixing with append_rows(), but
/// it does not affect the shuffle output result.
fn append_batch(&mut self, batch: RecordBatch) -> Result<isize> {
let mut mem_diff = batch_byte_size(&batch) as isize;
self.num_staging_rows += batch.num_rows();
self.staging.push(batch);
// staging -> frozen
if self.num_staging_rows >= self.batch_size {
mem_diff += self.flush()?;
}
Ok(mem_diff)
}
/// flush active data into one staging batch
fn flush_to_staging(&mut self) -> Result<isize> {
if self.num_active_rows == 0 {
return Ok(0);
}
let mut mem_diff = 0isize;
// active -> staging
let active = std::mem::take(&mut self.active);
self.num_active_rows = 0;
mem_diff -= self.active_slots_mem_size as isize;
let staging_batch = make_batch(self.schema.clone(), active)?;
mem_diff += self.append_batch(staging_batch)?;
Ok(mem_diff)
}
/// flush all active and staging data into frozen bytes
fn flush(&mut self) -> Result<isize> {
let mut mem_diff = 0isize;
if self.num_active_rows > 0 {
mem_diff += self.flush_to_staging()?;
}
if self.staging.is_empty() {
return Ok(mem_diff);
}
let frozen_batch = RecordBatch::concat(&self.schema, &self.staging)?;
mem_diff -= self
.staging
.iter()
.map(|batch| batch_byte_size(batch) as isize)
.sum::<isize>();
self.staging.clear();
self.num_staging_rows = 0;
let frozen_capacity_old = self.frozen.capacity();
let mut cursor = Cursor::new(&mut self.frozen);
cursor.seek(SeekFrom::End(0))?;
write_ipc_compressed(&frozen_batch, &mut cursor)?;
mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize;
Ok(mem_diff)
}
}
fn slot_size(len: usize, data_type: &DataType) -> usize {
match data_type {
DataType::Boolean => len / 8,
DataType::Int8 => len,
DataType::Int16 => len * 2,
DataType::Int32 => len * 4,
DataType::Int64 => len * 8,
DataType::UInt8 => len,
DataType::UInt16 => len * 2,
DataType::UInt32 => len * 4,
DataType::UInt64 => len * 8,
DataType::Float32 => len * 4,
DataType::Float64 => len * 8,
DataType::Date32 => len * 4,
DataType::Date64 => len * 8,
DataType::Time32(TimeUnit::Second) => len * 4,
DataType::Time32(TimeUnit::Millisecond) => len * 4,
DataType::Time64(TimeUnit::Microsecond) => len * 4,
DataType::Time64(TimeUnit::Nanosecond) => len * 4,
DataType::Utf8 => len * 4,
DataType::LargeUtf8 => len * 8,
DataType::Decimal(_, _) => len * 16,
_ => unimplemented!("data type not supported in shuffle write"),
}
}
fn append_columns(
to: &mut Box<dyn ArrayBuilder>,
from: &Arc<dyn Array>,
indices: &[usize],
data_type: &DataType,
) {
macro_rules! append {
($arrowty:ident) => {{
type B = paste::paste! {[< $arrowty Builder >]};
type A = paste::paste! {[< $arrowty Array >]};
let t = to.as_any_mut().downcast_mut::<B>().unwrap();
let f = from.as_any().downcast_ref::<A>().unwrap();
for &i in indices {
if f.is_valid(i) {
t.append_value(f.value(i)).unwrap();
} else {
t.append_null().unwrap();
}
}
}};
}
match data_type {
DataType::Boolean => append!(Boolean),
DataType::Int8 => append!(Int8),
DataType::Int16 => append!(Int16),
DataType::Int32 => append!(Int32),
DataType::Int64 => append!(Int64),
DataType::UInt8 => append!(UInt8),
DataType::UInt16 => append!(UInt16),
DataType::UInt32 => append!(UInt32),
DataType::UInt64 => append!(UInt64),
DataType::Float32 => append!(Float32),
DataType::Float64 => append!(Float64),
DataType::Date32 => append!(Date32),
DataType::Date64 => append!(Date64),
DataType::Time32(TimeUnit::Second) => append!(Time32Second),
DataType::Time32(TimeUnit::Millisecond) => append!(Time32Millisecond),
DataType::Time64(TimeUnit::Microsecond) => append!(Time64Microsecond),
DataType::Time64(TimeUnit::Nanosecond) => append!(Time64Nanosecond),
DataType::Utf8 => append!(String),
DataType::LargeUtf8 => append!(LargeString),
DataType::Decimal(_, _) => append!(Decimal),
_ => unimplemented!("data type not supported in shuffle write"),
}
}
struct SpillInfo {
file: NamedTempFile,
offsets: Vec<u64>,
}
struct ShuffleRepartitioner {
id: MemoryConsumerId,
output_data_file: String,
output_index_file: String,
schema: SchemaRef,
buffered_partitions: Mutex<Vec<PartitionBuffer>>,
spills: Mutex<Vec<SpillInfo>>,
/// Sort expressions
/// Partitioning scheme to use
partitioning: Partitioning,
num_output_partitions: usize,
runtime: Arc<RuntimeEnv>,
metrics: BaselineMetrics,
}
impl ShuffleRepartitioner {
#[allow(clippy::too_many_arguments)]
pub fn new(
partition_id: usize,
output_data_file: String,
output_index_file: String,
schema: SchemaRef,
partitioning: Partitioning,
metrics: BaselineMetrics,
runtime: Arc<RuntimeEnv>,
batch_size: usize,
) -> Self {
let num_output_partitions = partitioning.partition_count();
Self {
id: MemoryConsumerId::new(partition_id),
output_data_file,
output_index_file,
schema: schema.clone(),
buffered_partitions: Mutex::new(
(0..num_output_partitions)
.map(|_| PartitionBuffer::new(schema.clone(), batch_size))
.collect::<Vec<_>>(),
),
spills: Mutex::new(vec![]),
partitioning,
num_output_partitions,
runtime,
metrics,
}
}
async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
if input.num_rows() == 0 {
// skip empty batch
return Ok(());
}
let _timer = self.metrics.elapsed_compute().timer();
// NOTE: in shuffle writer exec, the output_rows metrics represents the
// number of rows those are written to output data file.
self.metrics.record_output(input.num_rows());
let num_output_partitions = self.num_output_partitions;
match &self.partitioning {
Partitioning::Hash(exprs, _) => {
let hashes_buf = &mut vec![];
let arrays = exprs
.iter()
.map(|expr| Ok(expr.evaluate(&input)?.into_array(input.num_rows())))
.collect::<Result<Vec<_>>>()?;
// use identical seed as spark hash partition
hashes_buf.resize(arrays[0].len(), 42);
// Hash arrays and compute buckets based on number of partitions
let partition_ids = create_hashes(&arrays, hashes_buf)?
.iter_mut()
.map(|hash| pmod(*hash, num_output_partitions) as u64)
.collect::<Vec<_>>();
// count each partition size
let mut partition_counters = vec![0usize; num_output_partitions];
for &partition_id in &partition_ids {
partition_counters[partition_id as usize] += 1
}
// accumulate partition counters into partition ends
let mut partition_ends = partition_counters;
let mut accum = 0;
partition_ends.iter_mut().for_each(|v| {
*v += accum;
accum = *v;
});
// calculate shuffled partition ids
let mut shuffled_partition_ids = vec![0usize; input.num_rows()];
for (index, &partition_id) in partition_ids.iter().enumerate().rev() {
partition_ends[partition_id as usize] -= 1;
let end = partition_ends[partition_id as usize];
shuffled_partition_ids[end] = index;
}
// after calculating, partition ends become partition starts
let mut partition_starts = partition_ends;
partition_starts.push(input.num_rows());
let mut mem_diff = 0;
for (partition_id, (&start, &end)) in partition_starts
.iter()
.tuple_windows()
.enumerate()
.filter(|(_, (start, end))| start < end)
{
let mut buffered_partitions = self.buffered_partitions.lock().await;
let output = &mut buffered_partitions[partition_id];
if end - start < output.batch_size {
mem_diff += output.append_rows(
input.columns(),
&shuffled_partition_ids[start..end],
)?;
std::mem::drop(buffered_partitions);
} else {
// for bigger slice, we can use column based operation
// to build batches and directly append to output.
// so that we can get rid of column <-> row conversion.
let indices = PrimitiveArray::from_iter(
shuffled_partition_ids[start..end]
.iter()
.map(|&idx| idx as u64),
);
let batch = RecordBatch::try_new(
input.schema(),
input
.columns()
.iter()
.map(|c| {
datafusion::arrow::compute::take(c, &indices, None)
})
.collect::<ArrowResult<Vec<ArrayRef>>>()?,
)?;
mem_diff += output.append_batch(batch)?;
}
}
if mem_diff > 0 {
let mem_increase = mem_diff as usize;
self.try_grow(mem_increase).await?;
self.metrics.mem_used().add(mem_increase);
}
if mem_diff < 0 {
let mem_used = self.metrics.mem_used().value();
let mem_decrease = mem_used.min(-mem_diff as usize);
self.shrink(mem_decrease);
self.metrics.mem_used().set(mem_used - mem_decrease);
}
}
other => {
// this should be unreachable as long as the validation logic
// in the constructor is kept up-to-date
return Err(DataFusionError::NotImplemented(format!(
"Unsupported repartitioning scheme {:?}",
other
)));
}
}
Ok(())
}
async fn shuffle_write(&self) -> Result<SendableRecordBatchStream> {
let _timer = self.metrics.elapsed_compute().timer();
let num_output_partitions = self.num_output_partitions;
let mut buffered_partitions = self.buffered_partitions.lock().await;
let mut output_batches: Vec<Vec<u8>> = vec![vec![]; num_output_partitions];
for i in 0..num_output_partitions {
buffered_partitions[i].flush()?;
output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen);
}
let mut spills = self.spills.lock().await;
let output_spills = spills.drain(..).collect::<Vec<_>>();
let data_file = self.output_data_file.clone();
let index_file = self.output_index_file.clone();
std::mem::drop(_timer);
let elapsed_compute = self.metrics.elapsed_compute().clone();
task::spawn_blocking(move || {
let _timer = elapsed_compute.timer();
let mut offsets = vec![0; num_output_partitions + 1];
let mut output_data = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(data_file)?;
for i in 0..num_output_partitions {
offsets[i] = output_data.stream_position()?;
output_data.write_all(&output_batches[i])?;
output_batches[i].clear();
// append partition in each spills
for spill in &output_spills {
let length = spill.offsets[i + 1] - spill.offsets[i];
if length > 0 {
let mut spill_file = File::open(&spill.file.path())?;
spill_file.seek(SeekFrom::Start(spill.offsets[i]))?;
std::io::copy(&mut spill_file.take(length), &mut output_data)?;
}
}
}
output_data.flush()?;
// add one extra offset at last to ease partition length computation
offsets[num_output_partitions] = output_data.stream_position()?;
let mut output_index = File::create(index_file)?;
for offset in offsets {
output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
}
output_index.flush()?;
Ok::<(), DataFusionError>(())
})
.await
.map_err(|e| {
DataFusionError::Execution(format!("shuffle write error: {:?}", e))
})??;
let used = self.metrics.mem_used().set(0);
self.shrink(used);
// shuffle writer always has empty output
Ok(Box::pin(MemoryStream::try_new(
vec![],
self.schema.clone(),
None,
)?))
}
fn used(&self) -> usize {
self.metrics.mem_used().value()
}
fn spilled_bytes(&self) -> usize {
self.metrics.spilled_bytes().value()
}
fn spill_count(&self) -> usize {
self.metrics.spill_count().value()
}
}
/// consume the `buffered_partitions` and do spill into a single temp shuffle output file
async fn spill_into(
buffered_partitions: &mut [PartitionBuffer],
path: &Path,
num_output_partitions: usize,
) -> Result<Vec<u64>> {
let mut output_batches: Vec<Vec<u8>> = vec![vec![]; num_output_partitions];
for i in 0..num_output_partitions {
buffered_partitions[i].flush()?;
output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen);
}
let path = path.to_owned();
task::spawn_blocking(move || {
let mut offsets = vec![0; num_output_partitions + 1];
let mut spill_data = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)?;
for i in 0..num_output_partitions {
offsets[i] = spill_data.stream_position()?;
spill_data.write_all(&output_batches[i])?;
output_batches[i].clear();
}
// add one extra offset at last to ease partition length computation
offsets[num_output_partitions] = spill_data.stream_position()?;
Ok(offsets)
})
.await
.map_err(|e| {
DataFusionError::Execution(format!("Error occurred while spilling {}", e))
})?
}
impl Debug for ShuffleRepartitioner {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("ShuffleRepartitioner")
.field("id", &self.id())
.field("memory_used", &self.used())
.field("spilled_bytes", &self.spilled_bytes())
.field("spilled_count", &self.spill_count())
.finish()
}
}
#[async_trait]
impl MemoryConsumer for ShuffleRepartitioner {
fn name(&self) -> String {
"ShuffleRepartitioner".to_owned()
}
fn id(&self) -> &MemoryConsumerId {
&self.id
}
fn memory_manager(&self) -> Arc<MemoryManager> {
self.runtime.memory_manager.clone()
}
fn type_(&self) -> &ConsumerType {
&ConsumerType::Requesting
}
async fn spill(&self) -> Result<usize> {
log::debug!(
"{}[{}] spilling shuffle data of {} to disk while inserting ({} time(s) so far)",
self.name(),
self.id(),
self.used(),
self.spill_count()
);
let mut buffered_partitions = self.buffered_partitions.lock().await;
// we could always get a chance to free some memory as long as we are holding some
if buffered_partitions.len() == 0 {
return Ok(0);
}
let spillfile = self.runtime.disk_manager.create_tmp_file()?;
let offsets = spill_into(
&mut buffered_partitions,
spillfile.path(),
self.num_output_partitions,
)
.await?;
let mut spills = self.spills.lock().await;
let freed = self.metrics.mem_used().set(0);
self.metrics.record_spill(freed);
spills.push(SpillInfo {
file: spillfile,
offsets,
});
Ok(freed)
}
fn mem_used(&self) -> usize {
self.metrics.mem_used().value()
}
}
impl Drop for ShuffleRepartitioner {
fn drop(&mut self) {
self.runtime.drop_consumer(self.id(), self.used());
}
}
/// The shuffle writer operator maps each input partition to M output partitions based on a
/// partitioning scheme. No guarantees are made about the order of the resulting partitions.
#[derive(Debug)]
pub struct ShuffleWriterExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
/// Partitioning scheme to use
partitioning: Partitioning,
/// Output data file path
output_data_file: String,
/// Output index file path
output_index_file: String,
/// Metrics
metrics: ExecutionPlanMetricsSet,
}
#[async_trait]
impl ExecutionPlan for ShuffleWriterExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
self.input.schema()
}
fn output_partitioning(&self) -> Partitioning {
self.partitioning.clone()
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
match children.len() {
1 => Ok(Arc::new(ShuffleWriterExec::try_new(
children[0].clone(),
self.partitioning.clone(),
self.output_data_file.clone(),
self.output_index_file.clone(),
)?)),
_ => Err(DataFusionError::Internal(
"RepartitionExec wrong number of children".to_string(),
)),
}
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context.clone())?;
let metrics = BaselineMetrics::new(&self.metrics, 0);
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(
external_shuffle(
input,
partition,
self.output_data_file.clone(),
self.output_index_file.clone(),
self.partitioning.clone(),
metrics,
context,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
.try_flatten(),
)))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "ShuffleWriterExec: partitioning={:?}", self.partitioning)
}
}
}
fn statistics(&self) -> Statistics {
self.input.statistics()
}
}
impl ShuffleWriterExec {
/// Create a new ShuffleWriterExec
pub fn try_new(
input: Arc<dyn ExecutionPlan>,
partitioning: Partitioning,
output_data_file: String,
output_index_file: String,
) -> Result<Self> {
Ok(ShuffleWriterExec {
input,
partitioning,
metrics: ExecutionPlanMetricsSet::new(),
output_data_file,
output_index_file,
})
}
}
pub async fn external_shuffle(
mut input: SendableRecordBatchStream,
partition_id: usize,
output_data_file: String,
output_index_file: String,
partitioning: Partitioning,
metrics: BaselineMetrics,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let repartitioner = ShuffleRepartitioner::new(
partition_id,
output_data_file,
output_index_file,
schema.clone(),
partitioning,
metrics,
context.runtime_env(),
context.session_config().batch_size(),
);
context.runtime_env().register_requester(repartitioner.id());
while let Some(batch) = input.next().await {
let batch = batch?;
repartitioner.insert_batch(batch).await?;
}
repartitioner.shuffle_write().await
}