blob: 2586043d518ac3214eea4c72dbc405a9b9b87603 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::{
io::{BufRead, BufReader, Result, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener},
sync::Arc,
thread,
};
use arrow_array::{Int64Array, RecordBatch};
use arrow_ipc::writer::StreamWriter;
use arrow_schema::{DataType, Field, Fields, Schema};
use once_cell::sync::Lazy;
use rand::{distributions::Standard, prelude::*};
use rayon::{iter, prelude::*};
use tracing::{error, info, info_span};
use tracing_subscriber::fmt::format::FmtSpan;
const RECORDS_PER_BATCH: usize = 4096;
const TOTAL_RECORDS: usize = if cfg!(debug_assertions) {
100_000
} else {
100_000_000
};
/// Schema for random data
static SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
Arc::new(Schema::new(
('a'..='d')
.map(|field_name| Field::new(field_name, DataType::Int64, true))
.collect::<Fields>(),
))
});
/// Random data
static DATA: Lazy<Vec<RecordBatch>> = Lazy::new(|| {
info_span!("data", TOTAL_RECORDS, RECORDS_PER_BATCH).in_scope(|| {
info!("Generating random data");
// Generate recordbatches with random data
iter::repeatn(
RECORDS_PER_BATCH,
TOTAL_RECORDS.div_euclid(RECORDS_PER_BATCH),
)
.chain(iter::once(TOTAL_RECORDS.rem_euclid(RECORDS_PER_BATCH)))
.map_init(rand::thread_rng, |rng, len| {
RecordBatch::try_new(
Arc::clone(&SCHEMA),
(0..SCHEMA.all_fields().len())
.map(|_| {
Arc::new(
rng.sample_iter::<i64, Standard>(Standard)
.take(len)
.collect::<Int64Array>(),
) as _
})
.collect(),
)
})
.flatten()
.collect()
})
});
fn get_simple(mut stream: std::net::TcpStream) {
info!("Incoming connection");
// Ignore incoming request.
for _ in BufReader::new(&mut stream)
.lines()
.take_while(|line| line.as_ref().is_ok_and(|line| !line.is_empty()))
{}
// Write response header.
stream
.write_all(
"HTTP/1.1 200 OK\r\ncontent-type: application/vnd.apache.arrow.stream\r\n\r\n"
.as_bytes(),
)
.unwrap();
// Stream the body.
let mut writer = StreamWriter::try_new(stream, &SCHEMA).unwrap();
for batch in DATA.iter() {
writer.write(batch).unwrap();
}
writer.finish().unwrap();
let stream = writer.into_inner().unwrap();
stream.shutdown(std::net::Shutdown::Both).unwrap();
}
fn main() -> Result<()> {
// Configure tracing subscriber.
tracing_subscriber::fmt()
.with_span_events(FmtSpan::CLOSE)
.init();
// Generate random data.
let _ = Lazy::force(&DATA);
// Start listening.
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8008);
let listener = TcpListener::bind(bind_addr)?;
info!(%bind_addr, "Listening");
// Handle incoming connections.
loop {
match listener.accept() {
Ok((stream, remote_peer)) => {
thread::spawn(move || {
info_span!("Writing Arrow IPC stream", %remote_peer)
.in_scope(|| get_simple(stream))
});
}
Err(error) => {
error!(%error, "Connection failed");
}
}
}
}