blob: f5ed8510a79e8e1eb11ac3e77665fdb9ff598413 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::ops::Range;
use std::sync::Arc;
use arrow::array::{
Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder,
TimestampNanosecondBuilder, UInt16Builder,
};
use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
#[derive(Debug, Clone)]
struct GeneratorOptions {
row_limit: usize,
pods_per_host: Range<usize>,
containers_per_pod: Range<usize>,
entries_per_container: Range<usize>,
}
impl Default for GeneratorOptions {
fn default() -> Self {
Self {
row_limit: usize::MAX,
pods_per_host: 1..15,
containers_per_pod: 1..3,
entries_per_container: 1024..8192,
}
}
}
#[derive(Default)]
struct BatchBuilder {
service: StringDictionaryBuilder<Int32Type>,
host: StringDictionaryBuilder<Int32Type>,
pod: StringDictionaryBuilder<Int32Type>,
container: StringDictionaryBuilder<Int32Type>,
image: StringDictionaryBuilder<Int32Type>,
time: TimestampNanosecondBuilder,
client_addr: StringBuilder,
request_duration: Int32Builder,
request_user_agent: StringBuilder,
request_method: StringBuilder,
request_host: StringBuilder,
request_bytes: Int32Builder,
response_bytes: Int32Builder,
response_status: UInt16Builder,
prices_status: Decimal128Builder,
options: GeneratorOptions,
row_count: usize,
}
impl BatchBuilder {
fn schema() -> SchemaRef {
let utf8_dict =
|| DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
Arc::new(Schema::new(vec![
Field::new("service", utf8_dict(), true),
Field::new("host", utf8_dict(), false),
Field::new("pod", utf8_dict(), false),
Field::new("container", utf8_dict(), false),
Field::new("image", utf8_dict(), false),
Field::new(
"time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("client_addr", DataType::Utf8, true),
Field::new("request_duration_ns", DataType::Int32, false),
Field::new("request_user_agent", DataType::Utf8, true),
Field::new("request_method", DataType::Utf8, true),
Field::new("request_host", DataType::Utf8, true),
Field::new("request_bytes", DataType::Int32, true),
Field::new("response_bytes", DataType::Int32, true),
Field::new("response_status", DataType::UInt16, false),
Field::new("decimal_price", DataType::Decimal128(38, 0), false),
]))
}
fn is_finished(&self) -> bool {
self.options.row_limit <= self.row_count
}
fn append(&mut self, rng: &mut StdRng, host: &str, service: &str) {
let num_pods = rng.gen_range(self.options.pods_per_host.clone());
let pods = generate_sorted_strings(rng, num_pods, 30..40);
for pod in pods {
let num_containers = rng.gen_range(self.options.containers_per_pod.clone());
for container_idx in 0..num_containers {
let container = format!("{service}_container_{container_idx}");
let image = format!(
"{container}@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9"
);
let num_entries =
rng.gen_range(self.options.entries_per_container.clone());
for i in 0..num_entries {
if self.is_finished() {
return;
}
let time = i as i64 * 1024;
self.append_row(rng, host, &pod, service, &container, &image, time);
}
}
}
}
#[allow(clippy::too_many_arguments)]
fn append_row(
&mut self,
rng: &mut StdRng,
host: &str,
pod: &str,
service: &str,
container: &str,
image: &str,
time: i64,
) {
self.row_count += 1;
let methods = &["GET", "PUT", "POST", "HEAD", "PATCH", "DELETE"];
let status = &[200, 204, 400, 503, 403];
self.service.append(service).unwrap();
self.host.append(host).unwrap();
self.pod.append(pod).unwrap();
self.container.append(container).unwrap();
self.image.append(image).unwrap();
self.time.append_value(time);
self.client_addr.append_value(format!(
"{}.{}.{}.{}",
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>()
));
self.request_duration.append_value(rng.gen());
self.request_user_agent
.append_value(random_string(rng, 20..100));
self.request_method
.append_value(methods[rng.gen_range(0..methods.len())]);
self.request_host
.append_value(format!("https://{service}.mydomain.com"));
self.request_bytes
.append_option(rng.gen_bool(0.9).then(|| rng.gen()));
self.response_bytes
.append_option(rng.gen_bool(0.9).then(|| rng.gen()));
self.response_status
.append_value(status[rng.gen_range(0..status.len())]);
self.prices_status.append_value(self.row_count as i128);
}
fn finish(mut self, schema: SchemaRef) -> RecordBatch {
RecordBatch::try_new(
schema,
vec![
Arc::new(self.service.finish()),
Arc::new(self.host.finish()),
Arc::new(self.pod.finish()),
Arc::new(self.container.finish()),
Arc::new(self.image.finish()),
Arc::new(self.time.finish()),
Arc::new(self.client_addr.finish()),
Arc::new(self.request_duration.finish()),
Arc::new(self.request_user_agent.finish()),
Arc::new(self.request_method.finish()),
Arc::new(self.request_host.finish()),
Arc::new(self.request_bytes.finish()),
Arc::new(self.response_bytes.finish()),
Arc::new(self.response_status.finish()),
Arc::new(
self.prices_status
.finish()
.with_precision_and_scale(38, 0)
.unwrap(),
),
],
)
.unwrap()
}
}
fn random_string(rng: &mut StdRng, len_range: Range<usize>) -> String {
let len = rng.gen_range(len_range);
(0..len)
.map(|_| rng.gen_range(b'a'..=b'z') as char)
.collect::<String>()
}
fn generate_sorted_strings(
rng: &mut StdRng,
count: usize,
str_len: Range<usize>,
) -> Vec<String> {
let mut strings: Vec<_> = (0..count)
.map(|_| random_string(rng, str_len.clone()))
.collect();
strings.sort_unstable();
strings
}
/// Iterator that generates sorted, [`RecordBatch`]es with randomly generated data with
/// an access log style schema for tracing or monitoring type
/// usecases.
///
/// This is useful for writing tests queries on such data
///
/// Here are the columns with example data:
///
/// ```text
/// service: 'backend'
/// host: 'i-1ec3ca3151468928.ec2.internal'
/// pod: 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
/// container: 'backend_container_0'
/// image: 'backend_container_0@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9'
/// time: '1970-01-01 00:00:00'
/// client_addr: '127.216.178.64'
/// request_duration_ns: -1261239112
/// request_user_agent: 'kxttrfiiietlsaygzphhwlqcgngnumuphliejmxfdznuurswhdcicrlprbnocibvsbukiohjjbjdygwbfhxqvurm'
/// request_method: 'PUT'
/// request_host: 'https://backend.mydomain.com'
/// request_bytes: -312099516
/// response_bytes: 1448834362
/// response_status: 200
/// ```
#[derive(Debug)]
pub struct AccessLogGenerator {
schema: SchemaRef,
rng: StdRng,
host_idx: usize,
/// maximum rows per batch
max_batch_size: usize,
/// How many rows have been returned so far
row_count: usize,
/// Options
options: GeneratorOptions,
}
impl Default for AccessLogGenerator {
fn default() -> Self {
Self::new()
}
}
impl AccessLogGenerator {
pub fn new() -> Self {
let seed = [
1, 0, 0, 0, 23, 0, 3, 0, 200, 1, 0, 0, 210, 30, 8, 0, 1, 0, 21, 0, 6, 0, 0,
0, 0, 0, 5, 0, 0, 0, 0, 0,
];
Self {
schema: BatchBuilder::schema(),
host_idx: 0,
rng: StdRng::from_seed(seed),
max_batch_size: usize::MAX,
row_count: 0,
options: Default::default(),
}
}
/// Return the schema of the [`RecordBatch`]es created
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
/// Limit the maximum batch size
pub fn with_max_batch_size(mut self, batch_size: usize) -> Self {
self.max_batch_size = batch_size;
self
}
/// Return up to row_limit rows;
pub fn with_row_limit(mut self, row_limit: usize) -> Self {
self.options.row_limit = row_limit;
self
}
/// Set the number of pods per host
pub fn with_pods_per_host(mut self, range: Range<usize>) -> Self {
self.options.pods_per_host = range;
self
}
/// Set the number of containers per pod
pub fn with_containers_per_pod(mut self, range: Range<usize>) -> Self {
self.options.containers_per_pod = range;
self
}
/// Set the number of log entries per container
pub fn with_entries_per_container(mut self, range: Range<usize>) -> Self {
self.options.entries_per_container = range;
self
}
}
impl Iterator for AccessLogGenerator {
type Item = RecordBatch;
fn next(&mut self) -> Option<Self::Item> {
if self.row_count == self.options.row_limit {
return None;
}
let row_limit = self
.max_batch_size
.min(self.options.row_limit - self.row_count);
let mut builder = BatchBuilder {
options: GeneratorOptions {
row_limit,
..self.options.clone()
},
..Default::default()
};
let host = format!(
"i-{:016x}.ec2.internal",
self.host_idx * 0x7d87f8ed5c5 + 0x1ec3ca3151468928
);
self.host_idx += 1;
for service in &["frontend", "backend", "database", "cache"] {
if self.rng.gen_bool(0.5) {
continue;
}
if builder.is_finished() {
break;
}
builder.append(&mut self.rng, &host, service);
}
let batch = builder.finish(Arc::clone(&self.schema));
self.row_count += batch.num_rows();
Some(batch)
}
}