blob: 4409fad6101af14075555a8fba306633e37997a6 [file] [log] [blame]
// Copyright 2022 The Blaze Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow::{array::ArrayRef, datatypes::FieldRef, record_batch::RecordBatch};
use datafusion::{common::Result, physical_expr::PhysicalExpr};
use crate::{
agg::{create_agg, AggFunction},
window::{
processors::{
agg_processor::AggProcessor, rank_processor::RankProcessor,
row_number_processor::RowNumberProcessor,
},
window_context::WindowContext,
},
};
pub mod processors;
pub mod window_context;
#[derive(Debug, Clone, Copy)]
pub enum WindowFunction {
RankLike(WindowRankType),
Agg(AggFunction),
}
#[derive(Debug, Clone, Copy)]
pub enum WindowRankType {
RowNumber,
Rank,
DenseRank,
}
pub trait WindowFunctionProcessor: Send + Sync {
fn process_batch(&mut self, context: &WindowContext, batch: &RecordBatch) -> Result<ArrayRef>;
fn process_batch_without_partitions(
&mut self,
context: &WindowContext,
batch: &RecordBatch,
) -> Result<ArrayRef>;
}
#[derive(Debug, Clone)]
pub struct WindowExpr {
field: FieldRef,
func: WindowFunction,
children: Vec<Arc<dyn PhysicalExpr>>,
}
impl WindowExpr {
pub fn new(
func: WindowFunction,
children: Vec<Arc<dyn PhysicalExpr>>,
field: FieldRef,
) -> Self {
Self {
field,
func,
children,
}
}
pub fn create_processor(
&self,
context: &Arc<WindowContext>,
) -> Result<Box<dyn WindowFunctionProcessor>> {
match self.func {
WindowFunction::RankLike(WindowRankType::RowNumber) => {
Ok(Box::new(RowNumberProcessor::new()))
}
WindowFunction::RankLike(WindowRankType::Rank) => {
Ok(Box::new(RankProcessor::new(false)))
}
WindowFunction::RankLike(WindowRankType::DenseRank) => {
Ok(Box::new(RankProcessor::new(true)))
}
WindowFunction::Agg(agg_func) => {
let agg = create_agg(agg_func, &self.children, &context.input_schema)?;
Ok(Box::new(AggProcessor::try_new(agg)?))
}
}
}
}