blob: dfd878275181c4cfef46c1f83f2823de05cd44ff [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Common functions used for testing
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_int32_array;
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
mod data_gen;
pub use data_gen::AccessLogGenerator;
pub use env_logger;
/// Extracts the i32 values from the set of batches and returns them as a single Vec
pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
batches
.iter()
.flat_map(|batch| {
assert_eq!(batch.num_columns(), 1);
as_int32_array(batch.column(0)).unwrap().iter()
})
.collect()
}
/// extract values from batches and sort them
pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> Vec<Option<i32>> {
let mut values: Vec<_> = partitions
.iter()
.flat_map(|batches| batches_to_vec(batches).into_iter())
.collect();
values.sort_unstable();
values
}
/// Adds a random number of empty record batches into the stream
pub fn add_empty_batches(
batches: Vec<RecordBatch>,
rng: &mut StdRng,
) -> Vec<RecordBatch> {
let schema = batches[0].schema();
batches
.into_iter()
.flat_map(|batch| {
// insert 0, or 1 empty batches before and after the current batch
let empty_batch = RecordBatch::new_empty(schema.clone());
std::iter::repeat(empty_batch.clone())
.take(rng.gen_range(0..2))
.chain(std::iter::once(batch))
.chain(std::iter::repeat(empty_batch).take(rng.gen_range(0..2)))
})
.collect()
}
/// "stagger" batches: split the batches into random sized batches
pub fn stagger_batch(batch: RecordBatch) -> Vec<RecordBatch> {
let seed = 42;
stagger_batch_with_seed(batch, seed)
}
/// "stagger" batches: split the batches into random sized batches
/// using the specified value for a rng seed
pub fn stagger_batch_with_seed(batch: RecordBatch, seed: u64) -> Vec<RecordBatch> {
let mut batches = vec![];
// use a random number generator to pick a random sized output
let mut rng = StdRng::seed_from_u64(seed);
let mut remainder = batch;
while remainder.num_rows() > 0 {
let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
batches.push(remainder.slice(0, batch_size));
remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
}
add_empty_batches(batches, &mut rng)
}