blob: b9d757e71a8e61ea21b03cb8327ed7cce12edc80 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use parquet::column::writer::ColumnWriter;
use parquet::file::properties::WriterProperties;
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::writer::FileWriter;
use parquet::file::writer::SerializedFileWriter;
use parquet::schema::parser::parse_message_type;
use std::fs;
use std::path::Path;
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::Duration;
#[test]
fn it_writes_data_without_hanging() {
let path = Path::new("it_writes_data_without_hanging.parquet");
let message_type = "
message BooleanType {
REQUIRED BOOLEAN DIM0;
}
";
let schema = Arc::new(parse_message_type(message_type).expect("parse schema"));
let props = Arc::new(WriterProperties::builder().build());
let file = fs::File::create(&path).expect("create file");
let mut writer =
SerializedFileWriter::new(file, schema, props).expect("create parquet writer");
for _group in 0..1 {
let mut row_group_writer = writer.next_row_group().expect("get row group writer");
let values: Vec<i64> = vec![0; 2049];
let my_bool_values: Vec<bool> = values
.iter()
.enumerate()
.map(|(count, _x)| count % 2 == 0)
.collect();
while let Some(mut col_writer) =
row_group_writer.next_column().expect("next column")
{
match col_writer {
ColumnWriter::BoolColumnWriter(ref mut typed_writer) => {
typed_writer
.write_batch(&my_bool_values, None, None)
.expect("writing bool column");
}
_ => {
panic!("only test boolean values");
}
}
row_group_writer
.close_column(col_writer)
.expect("close column");
}
let rg_md = row_group_writer.close().expect("close row group");
println!("total rows written: {}", rg_md.num_rows());
writer
.close_row_group(row_group_writer)
.expect("close row groups");
}
writer.close().expect("close writer");
let bytes = fs::read(&path).expect("read file");
assert_eq!(&bytes[0..4], &[b'P', b'A', b'R', b'1']);
// Now that we have written our data and are happy with it, make
// sure we can read it back in < 5 seconds...
let (sender, receiver) = mpsc::channel();
let _t = thread::spawn(move || {
let file = fs::File::open(&Path::new("it_writes_data_without_hanging.parquet"))
.expect("open file");
let reader = SerializedFileReader::new(file).expect("get serialized reader");
let iter = reader.get_row_iter(None).expect("get iterator");
for record in iter {
println!("reading: {}", record);
}
println!("finished reading");
if let Ok(()) = sender.send(true) {}
});
assert_ne!(
Err(mpsc::RecvTimeoutError::Timeout),
receiver.recv_timeout(Duration::from_millis(5000))
);
fs::remove_file("it_writes_data_without_hanging.parquet").expect("remove file");
}