PARQUET-1350: [C++] Use abstract ResizableBuffer instead of concrete PoolBuffer
Author: Antoine Pitrou <antoine@python.org>
Closes #477 from pitrou/PARQUET-1350-pool-buffer and squashes the following commits:
3af304e [Antoine Pitrou] Fix linting
6ffc72d [Antoine Pitrou] PARQUET-1350: [C++] Use abstract ResizableBuffer instead of concrete PoolBuffer
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index e0ff7aa..1c2f322 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -53,7 +53,7 @@
using arrow::Column;
using arrow::DataType;
using arrow::ListArray;
-using arrow::PoolBuffer;
+using arrow::ResizableBuffer;
using arrow::PrimitiveArray;
using arrow::Status;
using arrow::Table;
@@ -791,7 +791,7 @@
ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values));
std::shared_ptr<Table> table = MakeSimpleTable(values, false);
this->sink_ = std::make_shared<InMemoryOutputStream>();
- auto buffer = std::make_shared<::arrow::PoolBuffer>();
+ auto buffer = AllocateBuffer();
{
// BufferOutputStream closed on gc
@@ -946,8 +946,7 @@
ASSERT_OK_NO_THROW(
WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties));
- std::shared_ptr<PoolBuffer> int64_data =
- std::make_shared<PoolBuffer>(::arrow::default_memory_pool());
+ std::shared_ptr<ResizableBuffer> int64_data = AllocateBuffer();
{
ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
auto int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 1f933e6..be270a8 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -46,7 +46,7 @@
using arrow::Int32Array;
using arrow::ListArray;
using arrow::MemoryPool;
-using arrow::PoolBuffer;
+using arrow::ResizableBuffer;
using arrow::Status;
using arrow::StructArray;
using arrow::Table;
@@ -303,8 +303,7 @@
int16_t struct_def_level, MemoryPool* pool, const Node* node)
: children_(children),
struct_def_level_(struct_def_level),
- pool_(pool),
- def_levels_buffer_(pool) {
+ pool_(pool) {
InitField(node, children);
}
@@ -318,7 +317,7 @@
int16_t struct_def_level_;
MemoryPool* pool_;
std::shared_ptr<Field> field_;
- PoolBuffer def_levels_buffer_;
+ std::shared_ptr<ResizableBuffer> def_levels_buffer_;
Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count);
void InitField(const Node* node,
@@ -849,7 +848,7 @@
std::copy(values, values + length, out_ptr);
if (reader->nullable_values()) {
- std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
*out = std::make_shared<ArrayType<ArrowType>>(type, length, data, is_valid,
reader->null_count());
} else {
@@ -866,10 +865,10 @@
const std::shared_ptr<::arrow::DataType>& type,
std::shared_ptr<Array>* out) {
int64_t length = reader->values_written();
- std::shared_ptr<PoolBuffer> values = reader->ReleaseValues();
+ std::shared_ptr<ResizableBuffer> values = reader->ReleaseValues();
if (reader->nullable_values()) {
- std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
*out = std::make_shared<ArrayType<ArrowType>>(type, length, values, is_valid,
reader->null_count());
} else {
@@ -902,7 +901,7 @@
}
if (reader->nullable_values()) {
- std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
RETURN_NOT_OK(is_valid->Resize(BytesForBits(length), false));
*out = std::make_shared<BooleanArray>(type, length, data, is_valid,
reader->null_count());
@@ -930,7 +929,7 @@
}
if (reader->nullable_values()) {
- std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
*out = std::make_shared<TimestampArray>(type, length, data, is_valid,
reader->null_count());
} else {
@@ -958,7 +957,7 @@
}
if (reader->nullable_values()) {
- std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
*out = std::make_shared<::arrow::Date64Array>(type, length, data, is_valid,
reader->null_count());
} else {
@@ -1197,7 +1196,7 @@
}
if (reader->nullable_values()) {
- std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
*out = std::make_shared<::arrow::Decimal128Array>(type, length, data, is_valid,
reader->null_count());
} else {
@@ -1385,10 +1384,10 @@
size_t child_length;
RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length));
auto size = child_length * sizeof(int16_t);
- RETURN_NOT_OK(def_levels_buffer_.Resize(size));
+ RETURN_NOT_OK(AllocateResizableBuffer(pool_, size, &def_levels_buffer_));
// Initialize with the minimal def level
- std::memset(def_levels_buffer_.mutable_data(), -1, size);
- auto result_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ std::memset(def_levels_buffer_->mutable_data(), -1, size);
+ auto result_levels = reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
// When a struct is defined, all of its children def levels are at least at
// nesting level, and def level equals nesting level.
@@ -1408,7 +1407,7 @@
std::max(result_levels[i], std::min(child_def_levels[i], struct_def_level_));
}
}
- *data = reinterpret_cast<const int16_t*>(def_levels_buffer_.data());
+ *data = reinterpret_cast<const int16_t*>(def_levels_buffer_->data());
*length = child_length;
return Status::OK();
}
diff --git a/src/parquet/arrow/record_reader.cc b/src/parquet/arrow/record_reader.cc
index a3af5ac..b4d8766 100644
--- a/src/parquet/arrow/record_reader.cc
+++ b/src/parquet/arrow/record_reader.cc
@@ -68,10 +68,10 @@
levels_position_(0),
levels_capacity_(0) {
nullable_values_ = internal::HasSpacedValues(descr);
- values_ = std::make_shared<PoolBuffer>(pool);
- valid_bits_ = std::make_shared<PoolBuffer>(pool);
- def_levels_ = std::make_shared<PoolBuffer>(pool);
- rep_levels_ = std::make_shared<PoolBuffer>(pool);
+ values_ = AllocateBuffer(pool);
+ valid_bits_ = AllocateBuffer(pool);
+ def_levels_ = AllocateBuffer(pool);
+ rep_levels_ = AllocateBuffer(pool);
if (descr->physical_type() == Type::BYTE_ARRAY) {
builder_.reset(new ::arrow::BinaryBuilder(pool));
@@ -121,15 +121,15 @@
bool nullable_values() const { return nullable_values_; }
- std::shared_ptr<PoolBuffer> ReleaseValues() {
+ std::shared_ptr<ResizableBuffer> ReleaseValues() {
auto result = values_;
- values_ = std::make_shared<PoolBuffer>(pool_);
+ values_ = AllocateBuffer(pool_);
return result;
}
- std::shared_ptr<PoolBuffer> ReleaseIsValid() {
+ std::shared_ptr<ResizableBuffer> ReleaseIsValid() {
auto result = valid_bits_;
- valid_bits_ = std::make_shared<PoolBuffer>(pool_);
+ valid_bits_ = AllocateBuffer(pool_);
return result;
}
@@ -328,16 +328,16 @@
// TODO(wesm): ByteArray / FixedLenByteArray types
std::unique_ptr<::arrow::ArrayBuilder> builder_;
- std::shared_ptr<::arrow::PoolBuffer> values_;
+ std::shared_ptr<::arrow::ResizableBuffer> values_;
template <typename T>
T* ValuesHead() {
return reinterpret_cast<T*>(values_->mutable_data()) + values_written_;
}
- std::shared_ptr<::arrow::PoolBuffer> valid_bits_;
- std::shared_ptr<::arrow::PoolBuffer> def_levels_;
- std::shared_ptr<::arrow::PoolBuffer> rep_levels_;
+ std::shared_ptr<::arrow::ResizableBuffer> valid_bits_;
+ std::shared_ptr<::arrow::ResizableBuffer> def_levels_;
+ std::shared_ptr<::arrow::ResizableBuffer> rep_levels_;
};
// The minimum number of repetition/definition levels to decode at a time, for
@@ -775,11 +775,11 @@
const uint8_t* RecordReader::values() const { return impl_->values(); }
-std::shared_ptr<PoolBuffer> RecordReader::ReleaseValues() {
+std::shared_ptr<ResizableBuffer> RecordReader::ReleaseValues() {
return impl_->ReleaseValues();
}
-std::shared_ptr<PoolBuffer> RecordReader::ReleaseIsValid() {
+std::shared_ptr<ResizableBuffer> RecordReader::ReleaseIsValid() {
return impl_->ReleaseIsValid();
}
diff --git a/src/parquet/arrow/record_reader.h b/src/parquet/arrow/record_reader.h
index 9ca8b68..f02bf05 100644
--- a/src/parquet/arrow/record_reader.h
+++ b/src/parquet/arrow/record_reader.h
@@ -73,8 +73,8 @@
/// result of calling ReadRecords
void Reset();
- std::shared_ptr<PoolBuffer> ReleaseValues();
- std::shared_ptr<PoolBuffer> ReleaseIsValid();
+ std::shared_ptr<ResizableBuffer> ReleaseValues();
+ std::shared_ptr<ResizableBuffer> ReleaseIsValid();
::arrow::ArrayBuilder* builder();
/// \brief Number of values written including nulls (if any)
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index c70e0ef..4db98b7 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -363,12 +363,11 @@
int64_t non_null_entries = size - null_count - 1;
int64_t length_per_entry = values->length() / non_null_entries;
- auto offsets = std::make_shared<::arrow::PoolBuffer>(::arrow::default_memory_pool());
+ auto offsets = AllocateBuffer();
RETURN_NOT_OK(offsets->Resize((size + 1) * sizeof(int32_t)));
int32_t* offsets_ptr = reinterpret_cast<int32_t*>(offsets->mutable_data());
- auto null_bitmap =
- std::make_shared<::arrow::PoolBuffer>(::arrow::default_memory_pool());
+ auto null_bitmap = AllocateBuffer();
int64_t bitmap_size = ::arrow::BitUtil::CeilByte(size) / 8;
RETURN_NOT_OK(null_bitmap->Resize(bitmap_size));
uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index f772738..f3ddda9 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -41,7 +41,7 @@
using arrow::ListArray;
using arrow::MemoryPool;
using arrow::NumericArray;
-using arrow::PoolBuffer;
+using arrow::ResizableBuffer;
using arrow::PrimitiveArray;
using arrow::Status;
using arrow::Table;
@@ -109,7 +109,7 @@
Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
int64_t* values_offset, int64_t* num_values, int64_t* num_levels,
- const std::shared_ptr<PoolBuffer>& def_levels_scratch,
+ const std::shared_ptr<ResizableBuffer>& def_levels_scratch,
std::shared_ptr<Buffer>* def_levels_out,
std::shared_ptr<Buffer>* rep_levels_out,
std::shared_ptr<Array>* values_array) {
@@ -266,8 +266,8 @@
struct ColumnWriterContext {
ColumnWriterContext(MemoryPool* memory_pool, ArrowWriterProperties* properties)
: memory_pool(memory_pool), properties(properties) {
- this->data_buffer = std::make_shared<PoolBuffer>(memory_pool);
- this->def_levels_buffer = std::make_shared<PoolBuffer>(memory_pool);
+ this->data_buffer = AllocateBuffer(memory_pool);
+ this->def_levels_buffer = AllocateBuffer(memory_pool);
}
template <typename T>
@@ -282,10 +282,10 @@
// Buffer used for storing the data of an array converted to the physical type
// as expected by parquet-cpp.
- std::shared_ptr<PoolBuffer> data_buffer;
+ std::shared_ptr<ResizableBuffer> data_buffer;
// We use the shared ownership of this buffer
- std::shared_ptr<PoolBuffer> def_levels_buffer;
+ std::shared_ptr<ResizableBuffer> def_levels_buffer;
};
Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type) {
diff --git a/src/parquet/column-io-benchmark.cc b/src/parquet/column-io-benchmark.cc
index ad625bd..a9a7530 100644
--- a/src/parquet/column-io-benchmark.cc
+++ b/src/parquet/column-io-benchmark.cc
@@ -188,7 +188,7 @@
int16_t max_level = 1;
int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
static_cast<int>(levels.size()));
- auto buffer_rle = std::make_shared<PoolBuffer>();
+ auto buffer_rle = AllocateBuffer();
PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size));
while (state.KeepRunning()) {
@@ -212,7 +212,7 @@
int16_t max_level = 1;
int rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
static_cast<int>(levels.size()));
- auto buffer_rle = std::make_shared<PoolBuffer>();
+ auto buffer_rle = AllocateBuffer();
PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size + sizeof(int32_t)));
level_encoder.Init(Encoding::RLE, max_level, static_cast<int>(levels.size()),
buffer_rle->mutable_data() + sizeof(int32_t), rle_size);
diff --git a/src/parquet/column_reader-test.cc b/src/parquet/column_reader-test.cc
index 15ddc8b..273b302 100644
--- a/src/parquet/column_reader-test.cc
+++ b/src/parquet/column_reader-test.cc
@@ -328,7 +328,7 @@
max_rep_level_ = 0;
NodePtr type = schema::Int32("a", Repetition::REQUIRED);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
- shared_ptr<PoolBuffer> dummy = std::make_shared<PoolBuffer>();
+ shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
shared_ptr<DictionaryPage> dict_page =
std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
index bcbb339..bc3ee8a 100644
--- a/src/parquet/column_reader.cc
+++ b/src/parquet/column_reader.cc
@@ -124,7 +124,7 @@
// Compression codec to use.
std::unique_ptr<::arrow::Codec> decompressor_;
- std::shared_ptr<PoolBuffer> decompression_buffer_;
+ std::shared_ptr<ResizableBuffer> decompression_buffer_;
// Maximum allowed page size
uint32_t max_page_header_size_;
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
index 0d5f6ec..7134632 100644
--- a/src/parquet/column_reader.h
+++ b/src/parquet/column_reader.h
@@ -488,12 +488,12 @@
int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint
int64_t values_read = 0;
- std::shared_ptr<PoolBuffer> vals = AllocateBuffer(
+ std::shared_ptr<ResizableBuffer> vals = AllocateBuffer(
this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size);
- std::shared_ptr<PoolBuffer> def_levels =
+ std::shared_ptr<ResizableBuffer> def_levels =
AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
- std::shared_ptr<PoolBuffer> rep_levels =
+ std::shared_ptr<ResizableBuffer> rep_levels =
AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
do {
diff --git a/src/parquet/column_scanner.h b/src/parquet/column_scanner.h
index 2917201..0a866ee 100644
--- a/src/parquet/column_scanner.h
+++ b/src/parquet/column_scanner.h
@@ -44,7 +44,7 @@
: batch_size_(batch_size),
level_offset_(0),
levels_buffered_(0),
- value_buffer_(std::make_shared<PoolBuffer>(pool)),
+ value_buffer_(AllocateBuffer(pool)),
value_offset_(0),
values_buffered_(0),
reader_(reader) {
@@ -77,7 +77,7 @@
int level_offset_;
int levels_buffered_;
- std::shared_ptr<PoolBuffer> value_buffer_;
+ std::shared_ptr<ResizableBuffer> value_buffer_;
int value_offset_;
int64_t values_buffered_;
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index 8a1b56c..b3ff8c3 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -510,7 +510,7 @@
template <typename Type>
void TypedColumnWriter<Type>::WriteDictionaryPage() {
auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
- std::shared_ptr<PoolBuffer> buffer =
+ std::shared_ptr<ResizableBuffer> buffer =
AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
dict_encoder->WriteDict(buffer->mutable_data());
// TODO Get rid of this deep call
diff --git a/src/parquet/encoding-benchmark.cc b/src/parquet/encoding-benchmark.cc
index ca12c6a..5ea8f8f 100644
--- a/src/parquet/encoding-benchmark.cc
+++ b/src/parquet/encoding-benchmark.cc
@@ -113,10 +113,10 @@
encoder.Put(values[i]);
}
- std::shared_ptr<PoolBuffer> dict_buffer =
+ std::shared_ptr<ResizableBuffer> dict_buffer =
AllocateBuffer(allocator, encoder.dict_encoded_size());
- std::shared_ptr<PoolBuffer> indices =
+ std::shared_ptr<ResizableBuffer> indices =
AllocateBuffer(allocator, encoder.EstimatedDataEncodedSize());
encoder.WriteDict(dict_buffer->mutable_data());
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index e22edd0..98f9e4a 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -271,7 +271,7 @@
protected:
int bits_available_;
std::unique_ptr<::arrow::BitWriter> bit_writer_;
- std::shared_ptr<PoolBuffer> bits_buffer_;
+ std::shared_ptr<ResizableBuffer> bits_buffer_;
std::unique_ptr<InMemoryOutputStream> values_sink_;
};
@@ -370,7 +370,7 @@
// Data that contains the byte array data (byte_array_dictionary_ just has the
// pointers).
- std::shared_ptr<PoolBuffer> byte_array_data_;
+ std::shared_ptr<ResizableBuffer> byte_array_data_;
::arrow::RleDecoder idx_decoder_;
};
@@ -514,7 +514,7 @@
void Put(const T& value);
std::shared_ptr<Buffer> FlushValues() override {
- std::shared_ptr<PoolBuffer> buffer =
+ std::shared_ptr<ResizableBuffer> buffer =
AllocateBuffer(this->allocator_, EstimatedDataEncodedSize());
int result_size = WriteIndices(buffer->mutable_data(),
static_cast<int>(EstimatedDataEncodedSize()));
@@ -784,8 +784,7 @@
explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
- : Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED),
- delta_bit_widths_(new PoolBuffer(pool)) {
+ : Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) {
if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) {
throw ParquetException("Delta bit pack encoding should only be for integer data.");
}
@@ -813,8 +812,8 @@
ParquetException::EofException();
}
if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
- PARQUET_THROW_NOT_OK(delta_bit_widths_->Resize(num_mini_blocks_, false));
+ delta_bit_widths_ = AllocateBuffer(pool_, num_mini_blocks_);
uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
@@ -858,6 +857,7 @@
return max_values;
}
+ ::arrow::MemoryPool* pool_;
::arrow::BitReader decoder_;
int32_t values_current_block_;
int32_t num_mini_blocks_;
@@ -866,7 +866,7 @@
int32_t min_delta_;
size_t mini_block_idx_;
- std::unique_ptr<PoolBuffer> delta_bit_widths_;
+ std::shared_ptr<ResizableBuffer> delta_bit_widths_;
int delta_bit_width_;
int32_t last_value_;
diff --git a/src/parquet/encoding-test.cc b/src/parquet/encoding-test.cc
index 8d97bff..31bb79d 100644
--- a/src/parquet/encoding-test.cc
+++ b/src/parquet/encoding-test.cc
@@ -292,7 +292,7 @@
protected:
USING_BASE_MEMBERS();
- std::shared_ptr<PoolBuffer> dict_buffer_;
+ std::shared_ptr<ResizableBuffer> dict_buffer_;
};
TYPED_TEST_CASE(TestDictionaryEncoding, DictEncodedTypes);
diff --git a/src/parquet/encoding.h b/src/parquet/encoding.h
index e46ac2f..2742937 100644
--- a/src/parquet/encoding.h
+++ b/src/parquet/encoding.h
@@ -50,18 +50,19 @@
virtual void Put(const T* src, int num_values) = 0;
virtual void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) {
- PoolBuffer buffer(pool_);
- ::arrow::Status status = buffer.Resize(num_values * sizeof(T));
+ std::shared_ptr<ResizableBuffer> buffer;
+ auto status = ::arrow::AllocateResizableBuffer(pool_, num_values * sizeof(T),
+ &buffer);
if (!status.ok()) {
std::ostringstream ss;
- ss << "buffer.Resize failed in Encoder.PutSpaced in " << __FILE__ << ", on line "
- << __LINE__;
+ ss << "AllocateResizableBuffer failed in Encoder.PutSpaced in "
+ << __FILE__ << ", on line " << __LINE__;
throw ParquetException(ss.str());
}
int32_t num_valid_values = 0;
::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
num_values);
- T* data = reinterpret_cast<T*>(buffer.mutable_data());
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
for (int32_t i = 0; i < num_values; i++) {
if (valid_bits_reader.IsSet()) {
data[num_valid_values++] = src[i];
diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc
index ae1c0a7..c5a0f34 100644
--- a/src/parquet/file_reader.cc
+++ b/src/parquet/file_reader.cc
@@ -194,7 +194,7 @@
"file metadata size.");
}
- std::shared_ptr<PoolBuffer> metadata_buffer =
+ std::shared_ptr<ResizableBuffer> metadata_buffer =
AllocateBuffer(properties_.memory_pool(), metadata_len);
// Check if the footer_buffer contains the entire metadata
diff --git a/src/parquet/statistics.h b/src/parquet/statistics.h
index 4f9df72..d1c4d16 100644
--- a/src/parquet/statistics.h
+++ b/src/parquet/statistics.h
@@ -181,19 +181,19 @@
void PlainEncode(const T& src, std::string* dst);
void PlainDecode(const std::string& src, T* dst);
- void Copy(const T& src, T* dst, PoolBuffer* buffer);
+ void Copy(const T& src, T* dst, ResizableBuffer* buffer);
- std::shared_ptr<PoolBuffer> min_buffer_, max_buffer_;
+ std::shared_ptr<ResizableBuffer> min_buffer_, max_buffer_;
};
template <typename DType>
-inline void TypedRowGroupStatistics<DType>::Copy(const T& src, T* dst, PoolBuffer*) {
+inline void TypedRowGroupStatistics<DType>::Copy(const T& src, T* dst, ResizableBuffer*) {
*dst = src;
}
template <>
inline void TypedRowGroupStatistics<FLBAType>::Copy(const FLBA& src, FLBA* dst,
- PoolBuffer* buffer) {
+ ResizableBuffer* buffer) {
if (dst->ptr == src.ptr) return;
uint32_t len = descr_->type_length();
PARQUET_THROW_NOT_OK(buffer->Resize(len, false));
@@ -204,7 +204,7 @@
template <>
inline void TypedRowGroupStatistics<ByteArrayType>::Copy(const ByteArray& src,
ByteArray* dst,
- PoolBuffer* buffer) {
+ ResizableBuffer* buffer) {
if (dst->ptr == src.ptr) return;
PARQUET_THROW_NOT_OK(buffer->Resize(src.len, false));
std::memcpy(buffer->mutable_data(), src.ptr, src.len);
diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h
index a507dfb..3e74398 100644
--- a/src/parquet/test-util.h
+++ b/src/parquet/test-util.h
@@ -262,7 +262,7 @@
}
shared_ptr<Buffer> WriteDict() {
- std::shared_ptr<PoolBuffer> dict_buffer =
+ std::shared_ptr<ResizableBuffer> dict_buffer =
AllocateBuffer(::arrow::default_memory_pool(), encoder_->dict_encoded_size());
encoder_->WriteDict(dict_buffer->mutable_data());
return dict_buffer;
diff --git a/src/parquet/util/memory-test.cc b/src/parquet/util/memory-test.cc
index 4b620ab..cb8c706 100644
--- a/src/parquet/util/memory-test.cc
+++ b/src/parquet/util/memory-test.cc
@@ -255,7 +255,8 @@
int64_t stream_offset = 10;
int64_t stream_size = source_size - stream_offset;
int64_t chunk_size = 50;
- std::shared_ptr<PoolBuffer> buf = AllocateBuffer(default_memory_pool(), source_size);
+ std::shared_ptr<ResizableBuffer> buf = AllocateBuffer(default_memory_pool(),
+ source_size);
ASSERT_EQ(source_size, buf->size());
for (int i = 0; i < source_size; i++) {
buf->mutable_data()[i] = static_cast<uint8_t>(i);
diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc
index 3aa2570..df7ccc7 100644
--- a/src/parquet/util/memory.cc
+++ b/src/parquet/util/memory.cc
@@ -36,7 +36,7 @@
template <class T>
Vector<T>::Vector(int64_t size, MemoryPool* pool)
- : buffer_(AllocateUniqueBuffer(pool, size * sizeof(T))),
+ : buffer_(AllocateBuffer(pool, size * sizeof(T))),
size_(size),
capacity_(size) {
if (size > 0) {
@@ -495,19 +495,10 @@
buffer_offset_ += num_bytes;
}
-std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryPool* pool, int64_t size) {
- auto result = std::make_shared<PoolBuffer>(pool);
- if (size > 0) {
- PARQUET_THROW_NOT_OK(result->Resize(size));
- }
- return result;
-}
-
-std::unique_ptr<PoolBuffer> AllocateUniqueBuffer(MemoryPool* pool, int64_t size) {
- std::unique_ptr<PoolBuffer> result(new PoolBuffer(pool));
- if (size > 0) {
- PARQUET_THROW_NOT_OK(result->Resize(size));
- }
+std::shared_ptr<ResizableBuffer> AllocateBuffer(MemoryPool* pool, int64_t size) {
+ std::shared_ptr<ResizableBuffer> result;
+ PARQUET_THROW_NOT_OK(arrow::AllocateResizableBuffer(pool, size,
+ &result));
return result;
}
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index 5408d1c..69dcebf 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -74,7 +74,7 @@
using Buffer = ::arrow::Buffer;
using MutableBuffer = ::arrow::MutableBuffer;
using ResizableBuffer = ::arrow::ResizableBuffer;
-using PoolBuffer = ::arrow::PoolBuffer;
+using ResizableBuffer = ::arrow::ResizableBuffer;
template <class T>
class PARQUET_EXPORT Vector {
@@ -89,7 +89,7 @@
const T* data() const { return data_; }
private:
- std::unique_ptr<PoolBuffer> buffer_;
+ std::shared_ptr<ResizableBuffer> buffer_;
int64_t size_;
int64_t capacity_;
T* data_;
@@ -429,7 +429,7 @@
virtual void Advance(int64_t num_bytes);
private:
- std::shared_ptr<PoolBuffer> buffer_;
+ std::shared_ptr<ResizableBuffer> buffer_;
RandomAccessSource* source_;
int64_t stream_offset_;
int64_t stream_end_;
@@ -437,11 +437,8 @@
int64_t buffer_size_;
};
-std::shared_ptr<PoolBuffer> PARQUET_EXPORT AllocateBuffer(::arrow::MemoryPool* pool,
- int64_t size = 0);
-
-std::unique_ptr<PoolBuffer> PARQUET_EXPORT AllocateUniqueBuffer(::arrow::MemoryPool* pool,
- int64_t size = 0);
+std::shared_ptr<ResizableBuffer> PARQUET_EXPORT AllocateBuffer(
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), int64_t size = 0);
} // namespace parquet