parquet: improve BOOLEAN writing logic and report error on encoding fail (#443)

* improve BOOLEAN writing logic and report error on encoding fail

When writing BOOLEAN data, writing more than 2048 rows of data will
overflow the hard-coded 256 buffer set for the bit-writer in the
PlainEncoder. Once this occurs, further attempts to write to the encoder
fail, becuase capacity is exceeded, but the errors are silently ignored.

This fix improves the error detection and reporting at the point of
encoding and modifies the logic for bit_writing (BOOLEANS). The
bit_writer is initially allocated 256 bytes (as at present), then each
time the capacity is exceeded the capacity is incremented by another
256 bytes.

This certainly resolves the current problem, but it's not exactly a
great fix because the capacity of the bit_writer could now grow
substantially.

Other data types seem to have a more sophisticated mechanism for writing
data which doesn't involve growing or having a fixed size buffer. It
would be desirable to make the BOOLEAN type use this same mechanism if
possible, but that level of change is more intrusive and probably
requires greater knowledge of the implementation than I possess.

resolves: #349

* only manipulate the bit_writer for BOOLEAN data

Tacky, but I can't think of better way to do this without
specialization.

* better isolation of changes

Remove the byte tracking from the PlainEncoder and use the existing
bytes_written() method in BitWriter.

This is neater.

* add test for boolean writer

The test ensures that we can write > 2048 rows to a parquet file and
that when we read the data back, it finishes without hanging (defined as
taking < 5 seconds).

If we don't want that extra complexity, we could remove the
thread/channel stuff and just try to read the file and let the test
runner terminate hanging tests.

* fix capacity calculation error in bool encoding

The values.len() reports the number of values to be encoded and so must
be divided by 8 (bits in a bytes) to determine the effect on the byte
capacity of the bit_writer.
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index aa1def3..f97df3c 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -661,8 +661,15 @@
             _: &mut W,
             bit_writer: &mut BitWriter,
         ) -> Result<()> {
+            if bit_writer.bytes_written() + values.len() / 8 >= bit_writer.capacity() {
+                bit_writer.extend(256);
+            }
             for value in values {
-                bit_writer.put_value(*value as u64, 1);
+                if !bit_writer.put_value(*value as u64, 1) {
+                    return Err(ParquetError::EOF(
+                        "unable to put boolean value".to_string(),
+                    ));
+                }
             }
             Ok(())
         }
diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs
index 8dfb631..45cfe2b 100644
--- a/parquet/src/util/bit_util.rs
+++ b/parquet/src/util/bit_util.rs
@@ -223,6 +223,20 @@
         }
     }
 
+    /// Extend buffer size
+    #[inline]
+    pub fn extend(&mut self, increment: usize) {
+        self.max_bytes += increment;
+        let extra = vec![0; increment];
+        self.buffer.extend(extra);
+    }
+
+    /// Report buffer size
+    #[inline]
+    pub fn capacity(&mut self) -> usize {
+        self.max_bytes
+    }
+
     /// Consumes and returns the current buffer.
     #[inline]
     pub fn consume(mut self) -> Vec<u8> {
diff --git a/parquet/tests/boolean_writer.rs b/parquet/tests/boolean_writer.rs
new file mode 100644
index 0000000..b9d757e
--- /dev/null
+++ b/parquet/tests/boolean_writer.rs
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use parquet::column::writer::ColumnWriter;
+use parquet::file::properties::WriterProperties;
+use parquet::file::reader::FileReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use parquet::file::writer::FileWriter;
+use parquet::file::writer::SerializedFileWriter;
+use parquet::schema::parser::parse_message_type;
+use std::fs;
+use std::path::Path;
+use std::sync::{mpsc, Arc};
+use std::thread;
+use std::time::Duration;
+
+#[test]
+fn it_writes_data_without_hanging() {
+    let path = Path::new("it_writes_data_without_hanging.parquet");
+
+    let message_type = "
+  message BooleanType {
+    REQUIRED BOOLEAN DIM0;
+  }
+";
+    let schema = Arc::new(parse_message_type(message_type).expect("parse schema"));
+    let props = Arc::new(WriterProperties::builder().build());
+    let file = fs::File::create(&path).expect("create file");
+    let mut writer =
+        SerializedFileWriter::new(file, schema, props).expect("create parquet writer");
+    for _group in 0..1 {
+        let mut row_group_writer = writer.next_row_group().expect("get row group writer");
+        let values: Vec<i64> = vec![0; 2049];
+        let my_bool_values: Vec<bool> = values
+            .iter()
+            .enumerate()
+            .map(|(count, _x)| count % 2 == 0)
+            .collect();
+        while let Some(mut col_writer) =
+            row_group_writer.next_column().expect("next column")
+        {
+            match col_writer {
+                ColumnWriter::BoolColumnWriter(ref mut typed_writer) => {
+                    typed_writer
+                        .write_batch(&my_bool_values, None, None)
+                        .expect("writing bool column");
+                }
+                _ => {
+                    panic!("only test boolean values");
+                }
+            }
+            row_group_writer
+                .close_column(col_writer)
+                .expect("close column");
+        }
+        let rg_md = row_group_writer.close().expect("close row group");
+        println!("total rows written: {}", rg_md.num_rows());
+        writer
+            .close_row_group(row_group_writer)
+            .expect("close row groups");
+    }
+    writer.close().expect("close writer");
+
+    let bytes = fs::read(&path).expect("read file");
+    assert_eq!(&bytes[0..4], &[b'P', b'A', b'R', b'1']);
+
+    // Now that we have written our data and are happy with it, make
+    // sure we can read it back in < 5 seconds...
+    let (sender, receiver) = mpsc::channel();
+    let _t = thread::spawn(move || {
+        let file = fs::File::open(&Path::new("it_writes_data_without_hanging.parquet"))
+            .expect("open file");
+        let reader = SerializedFileReader::new(file).expect("get serialized reader");
+        let iter = reader.get_row_iter(None).expect("get iterator");
+        for record in iter {
+            println!("reading: {}", record);
+        }
+        println!("finished reading");
+        if let Ok(()) = sender.send(true) {}
+    });
+    assert_ne!(
+        Err(mpsc::RecvTimeoutError::Timeout),
+        receiver.recv_timeout(Duration::from_millis(5000))
+    );
+    fs::remove_file("it_writes_data_without_hanging.parquet").expect("remove file");
+}