blob: 0420428dc21546704cff12f112a5c4cf4da0d648 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::protobuf::ray_sql_exec_node::PlanType;
use crate::protobuf::{RaySqlExecNode, ShuffleReaderExecNode, ShuffleWriterExecNode};
use crate::shuffle::{ShuffleReaderExec, ShuffleWriterExec};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::{DataFusionError, Result};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec};
use datafusion_proto::protobuf::{self, PhysicalHashRepartition, PhysicalPlanNode};
use prost::Message;
use std::sync::Arc;
#[derive(Debug)]
pub struct ShuffleCodec {}
impl PhysicalExtensionCodec for ShuffleCodec {
fn try_decode(
&self,
buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
registry: &dyn FunctionRegistry,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
// decode bytes to protobuf struct
let node = RaySqlExecNode::decode(buf)
.map_err(|e| DataFusionError::Internal(format!("failed to decode plan: {e:?}")))?;
let extension_codec = DefaultPhysicalExtensionCodec {};
match node.plan_type {
Some(PlanType::ShuffleReader(reader)) => {
let schema = reader.schema.as_ref().unwrap();
let schema: SchemaRef = Arc::new(schema.try_into().unwrap());
let hash_part = parse_protobuf_hash_partitioning(
reader.partitioning.as_ref(),
registry,
&schema,
&extension_codec,
)?;
Ok(Arc::new(ShuffleReaderExec::new(
reader.stage_id as usize,
schema,
hash_part.unwrap(),
&reader.shuffle_dir,
)))
}
Some(PlanType::ShuffleWriter(writer)) => {
let plan = writer.plan.unwrap().try_into_physical_plan(
registry,
&RuntimeEnv::default(),
self,
)?;
let hash_part = parse_protobuf_hash_partitioning(
writer.partitioning.as_ref(),
registry,
plan.schema().as_ref(),
&extension_codec,
)?;
Ok(Arc::new(ShuffleWriterExec::new(
writer.stage_id as usize,
plan,
hash_part.unwrap(),
&writer.shuffle_dir,
)))
}
_ => unreachable!(),
}
}
fn try_encode(
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
) -> Result<(), DataFusionError> {
let plan = if let Some(reader) = node.as_any().downcast_ref::<ShuffleReaderExec>() {
let schema: protobuf::Schema = reader.schema().try_into().unwrap();
let partitioning =
encode_partitioning_scheme(reader.properties().output_partitioning())?;
let reader = ShuffleReaderExecNode {
stage_id: reader.stage_id as u32,
schema: Some(schema),
partitioning: Some(partitioning),
shuffle_dir: reader.shuffle_dir.clone(),
};
PlanType::ShuffleReader(reader)
} else if let Some(writer) = node.as_any().downcast_ref::<ShuffleWriterExec>() {
let plan = PhysicalPlanNode::try_from_physical_plan(writer.input_plan.clone(), self)?;
let partitioning =
encode_partitioning_scheme(writer.properties().output_partitioning())?;
let writer = ShuffleWriterExecNode {
stage_id: writer.stage_id as u32,
plan: Some(plan),
partitioning: Some(partitioning),
shuffle_dir: writer.shuffle_dir.clone(),
};
PlanType::ShuffleWriter(writer)
} else {
unreachable!()
};
plan.encode(buf);
Ok(())
}
}
fn encode_partitioning_scheme(partitioning: &Partitioning) -> Result<PhysicalHashRepartition> {
match partitioning {
Partitioning::Hash(expr, partition_count) => Ok(protobuf::PhysicalHashRepartition {
hash_expr: expr
.iter()
.map(|expr| serialize_physical_expr(expr, &DefaultPhysicalExtensionCodec {}))
.collect::<Result<Vec<_>, DataFusionError>>()?,
partition_count: *partition_count as u64,
}),
Partitioning::UnknownPartitioning(n) => Ok(protobuf::PhysicalHashRepartition {
hash_expr: vec![],
partition_count: *n as u64,
}),
other => Err(DataFusionError::Plan(format!(
"Unsupported shuffle partitioning scheme: {other:?}"
))),
}
}