blob: be2bc0712afbd2deb5df289f074e8a8e34ae189b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Common functions used for testing
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_int32_array;
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
pub mod array_gen;
mod data_gen;
mod string_gen;
pub mod tpcds;
pub mod tpch;
pub use data_gen::AccessLogGenerator;
pub use string_gen::StringBatchGenerator;
pub use env_logger;
/// Extracts the i32 values from the set of batches and returns them as a single Vec
pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
batches
.iter()
.flat_map(|batch| {
assert_eq!(batch.num_columns(), 1);
as_int32_array(batch.column(0)).unwrap().iter()
})
.collect()
}
/// extract i32 values from batches and sort them
pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> Vec<Option<i32>> {
let mut values: Vec<_> = partitions
.iter()
.flat_map(|batches| batches_to_vec(batches).into_iter())
.collect();
values.sort_unstable();
values
}
/// Adds a random number of empty record batches into the stream
pub fn add_empty_batches(
batches: Vec<RecordBatch>,
rng: &mut StdRng,
) -> Vec<RecordBatch> {
let schema = batches[0].schema();
batches
.into_iter()
.flat_map(|batch| {
// insert 0, or 1 empty batches before and after the current batch
let empty_batch = RecordBatch::new_empty(schema.clone());
std::iter::repeat_n(empty_batch.clone(), rng.random_range(0..2))
.chain(std::iter::once(batch))
.chain(std::iter::repeat_n(empty_batch, rng.random_range(0..2)))
})
.collect()
}
/// "stagger" batches: split the batches into random sized batches
///
/// For example, if the input batch has 1000 rows, [`stagger_batch`] might return
/// multiple batches
/// ```text
/// [
/// RecordBatch(123 rows),
/// RecordBatch(234 rows),
/// RecordBatch(634 rows),
/// ]
/// ```
pub fn stagger_batch(batch: RecordBatch) -> Vec<RecordBatch> {
let seed = 42;
stagger_batch_with_seed(batch, seed)
}
/// "stagger" batches: split the batches into random sized batches using the
/// specified value for a rng seed. See [`stagger_batch`] for more detail.
pub fn stagger_batch_with_seed(batch: RecordBatch, seed: u64) -> Vec<RecordBatch> {
let mut batches = vec![];
// use a random number generator to pick a random sized output
let mut rng = StdRng::seed_from_u64(seed);
let mut remainder = batch;
while remainder.num_rows() > 0 {
let batch_size = rng.random_range(0..remainder.num_rows() + 1);
batches.push(remainder.slice(0, batch_size));
remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
}
add_empty_batches(batches, &mut rng)
}
/// Table definition of a name/schema
pub struct TableDef {
pub name: String,
pub schema: Schema,
}
impl TableDef {
fn new(name: impl Into<String>, schema: Schema) -> Self {
Self {
name: name.into(),
schema,
}
}
}