blob: 4d938728e8ef61c26a94ca25cb898efa9344a281 [file] [log] [blame]
// Copyright 2022 The Blaze Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow::{array::*, record_batch::RecordBatch};
use datafusion::{common::Result, physical_expr::PhysicalExpr};
use itertools::Itertools;
use crate::generate::{GeneratedRows, Generator};
#[derive(Debug)]
pub struct ExplodeArray {
child: Arc<dyn PhysicalExpr>,
position: bool,
}
impl ExplodeArray {
pub fn new(child: Arc<dyn PhysicalExpr>, position: bool) -> Self {
Self { child, position }
}
}
impl Generator for ExplodeArray {
fn exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
}
fn with_new_exprs(&self, exprs: Vec<Arc<dyn PhysicalExpr>>) -> Result<Arc<dyn Generator>> {
Ok(Arc::new(Self {
child: exprs[0].clone(),
position: self.position,
}))
}
fn eval(&self, batch: &RecordBatch) -> Result<GeneratedRows> {
let input_array = self.child.evaluate(batch)?.into_array(batch.num_rows())?;
let list = as_list_array(&input_array);
let value_offsets = list.value_offsets();
let mut orig_row_id_builder = Int32Builder::new();
let mut pos_builder = Int32Builder::new();
// build row_id and pos arrays
for (orig_row_id, (&start, &end)) in value_offsets.iter().tuple_windows().enumerate() {
if list.is_valid(orig_row_id) && start < end {
for i in start..end {
orig_row_id_builder.append_value(orig_row_id as i32);
pos_builder.append_value((i - start) as i32);
}
}
}
let orig_row_ids = orig_row_id_builder.finish();
let values = list
.values()
.slice(value_offsets[0] as usize, orig_row_ids.len());
let cols = if self.position {
vec![Arc::new(pos_builder.finish()), values]
} else {
vec![values]
};
Ok(GeneratedRows { orig_row_ids, cols })
}
}
#[derive(Debug)]
pub struct ExplodeMap {
child: Arc<dyn PhysicalExpr>,
position: bool,
}
impl ExplodeMap {
pub fn new(child: Arc<dyn PhysicalExpr>, position: bool) -> Self {
Self { child, position }
}
}
impl Generator for ExplodeMap {
fn exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
}
fn with_new_exprs(&self, exprs: Vec<Arc<dyn PhysicalExpr>>) -> Result<Arc<dyn Generator>> {
Ok(Arc::new(Self {
child: exprs[0].clone(),
position: self.position,
}))
}
fn eval(&self, batch: &RecordBatch) -> Result<GeneratedRows> {
let input_array = self.child.evaluate(batch)?.into_array(batch.num_rows())?;
let map = as_map_array(&input_array);
let value_offsets = map.value_offsets();
let mut orig_row_id_builder = Int32Builder::new();
let mut pos_builder = Int32Builder::new();
// build row_id and pos arrays
for (orig_row_id, (&start, &end)) in value_offsets.iter().tuple_windows().enumerate() {
if map.is_valid(orig_row_id) && start < end {
for i in start..end {
orig_row_id_builder.append_value(orig_row_id as i32);
pos_builder.append_value((i - start) as i32);
}
}
}
let orig_row_ids = orig_row_id_builder.finish();
let keys = map
.keys()
.slice(value_offsets[0] as usize, orig_row_ids.len());
let values = map
.values()
.slice(value_offsets[0] as usize, orig_row_ids.len());
let cols = if self.position {
vec![Arc::new(pos_builder.finish()), keys, values]
} else {
vec![keys, values]
};
Ok(GeneratedRows { orig_row_ids, cols })
}
}