PARQUET-859: Flatten parquet/file directory, consolidate file reader, file writer code
I believe this makes the codebase simpler and easier to navigate. By consolidating the file reader/writer code, further refactoring and internal improvements will be less obtuse
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes #424 from wesm/PARQUET-859 and squashes the following commits:
c98e3d4 [Wes McKinney] Do not generate Thrift files in source directory
e987f47 [Wes McKinney] Remove superfluous PARQUET_EXPORT
0fb83a8 [Wes McKinney] Consolidate file_reader.h, file_reader-internal.h
ac6e2c5 [Wes McKinney] First cut flattening of parquet/file directory and consolidate writer, writer-internal headers
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 8a3558c..4774631 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -375,6 +375,7 @@
# Dependencies
############################################################
+include_directories(${CMAKE_CURRENT_BINARY_DIR}/src)
include_directories(
${CMAKE_CURRENT_SOURCE_DIR}/src
)
@@ -661,7 +662,7 @@
endif()
# List of thrift output targets
-set(THRIFT_OUTPUT_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src/parquet)
+set(THRIFT_OUTPUT_DIR ${CMAKE_CURRENT_BINARY_DIR}/src/parquet)
set(THRIFT_OUTPUT_FILES "${THRIFT_OUTPUT_DIR}/parquet_types.cpp")
set(THRIFT_OUTPUT_FILES ${THRIFT_OUTPUT_FILES} "${THRIFT_OUTPUT_DIR}/parquet_types.h")
set(THRIFT_OUTPUT_FILES ${THRIFT_OUTPUT_FILES} "${THRIFT_OUTPUT_DIR}/parquet_constants.cpp")
@@ -683,30 +684,23 @@
# Library config
set(LIBPARQUET_SRCS
- src/parquet/exception.cc
- src/parquet/types.cc
-
src/parquet/arrow/reader.cc
src/parquet/arrow/record_reader.cc
src/parquet/arrow/schema.cc
src/parquet/arrow/writer.cc
-
src/parquet/column_reader.cc
src/parquet/column_scanner.cc
src/parquet/column_writer.cc
-
- src/parquet/file/metadata.cc
- src/parquet/file/printer.cc
- src/parquet/file/reader.cc
- src/parquet/file/reader-internal.cc
- src/parquet/file/writer.cc
- src/parquet/file/writer-internal.cc
-
- src/parquet/schema.cc
- src/parquet/statistics.cc
-
+ src/parquet/exception.cc
+ src/parquet/file_reader.cc
+ src/parquet/file_writer.cc
+ src/parquet/metadata.cc
src/parquet/parquet_constants.cpp
src/parquet/parquet_types.cpp
+ src/parquet/printer.cc
+ src/parquet/schema.cc
+ src/parquet/statistics.cc
+ src/parquet/types.cc
src/parquet/util/comparison.cc
src/parquet/util/memory.cc
)
@@ -785,7 +779,6 @@
add_subdirectory(src/parquet)
add_subdirectory(src/parquet/api)
add_subdirectory(src/parquet/arrow)
-add_subdirectory(src/parquet/file)
add_subdirectory(src/parquet/util)
if (NOT MSVC)
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index a2e283e..bc16d8b 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -23,6 +23,10 @@
column_writer.h
encoding.h
exception.h
+ file_reader.h
+ file_writer.h
+ metadata.h
+ printer.h
properties.h
schema.h
statistics.h
@@ -49,9 +53,12 @@
ADD_PARQUET_TEST(column_reader-test)
ADD_PARQUET_TEST(column_scanner-test)
ADD_PARQUET_TEST(column_writer-test)
+ADD_PARQUET_TEST(file-deserialize-test)
+ADD_PARQUET_TEST(file-serialize-test)
ADD_PARQUET_TEST(properties-test)
ADD_PARQUET_TEST(statistics-test)
ADD_PARQUET_TEST(encoding-test)
+ADD_PARQUET_TEST(metadata-test)
ADD_PARQUET_TEST(public-api-test)
ADD_PARQUET_TEST(types-test)
ADD_PARQUET_TEST(reader-test)
diff --git a/src/parquet/api/reader.h b/src/parquet/api/reader.h
index ba9717a..505654f 100644
--- a/src/parquet/api/reader.h
+++ b/src/parquet/api/reader.h
@@ -22,11 +22,9 @@
#include "parquet/column_reader.h"
#include "parquet/column_scanner.h"
#include "parquet/exception.h"
-#include "parquet/file/printer.h"
-#include "parquet/file/reader.h"
-
-// Metadata reader API
-#include "parquet/file/metadata.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+#include "parquet/printer.h"
// Schemas
#include "parquet/api/schema.h"
diff --git a/src/parquet/api/writer.h b/src/parquet/api/writer.h
index cc3ae2a..3b4e42f 100644
--- a/src/parquet/api/writer.h
+++ b/src/parquet/api/writer.h
@@ -18,15 +18,10 @@
#ifndef PARQUET_API_WRITER_H
#define PARQUET_API_WRITER_H
-// Column reader API
+#include "parquet/api/io.h"
+#include "parquet/api/schema.h"
#include "parquet/column_writer.h"
#include "parquet/exception.h"
-#include "parquet/file/writer.h"
-
-// Schemas
-#include "parquet/api/schema.h"
-
-// IO
-#include "parquet/api/io.h"
+#include "parquet/file_writer.h"
#endif // PARQUET_API_WRITER_H
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index edeef1e..15d2cf7 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -23,8 +23,8 @@
#include "parquet/arrow/writer.h"
#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/writer-internal.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
#include "parquet/util/memory.h"
#include "arrow/api.h"
@@ -142,7 +142,6 @@
template <bool nullable, typename ParquetType>
static void BM_WriteColumn(::benchmark::State& state) {
- format::ColumnChunk thrift_metadata;
std::vector<typename ParquetType::c_type> values(BENCHMARK_SIZE, 128);
std::shared_ptr<::arrow::Table> table = TableFromVector<ParquetType>(values, nullable);
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index a8d3824..c10b164 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -34,7 +34,7 @@
#include "parquet/arrow/test-util.h"
#include "parquet/arrow/writer.h"
-#include "parquet/file/writer.h"
+#include "parquet/file_writer.h"
#include "arrow/api.h"
#include "arrow/test-util.h"
diff --git a/src/parquet/arrow/record_reader.h b/src/parquet/arrow/record_reader.h
index 8d55f9d..9ca8b68 100644
--- a/src/parquet/arrow/record_reader.h
+++ b/src/parquet/arrow/record_reader.h
@@ -30,7 +30,7 @@
#include <arrow/memory_pool.h>
#include <arrow/util/bit-util.h>
-#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
#include "parquet/schema.h"
#include "parquet/util/visibility.h"
diff --git a/src/parquet/column-io-benchmark.cc b/src/parquet/column-io-benchmark.cc
index ec7b52e..7c8d093 100644
--- a/src/parquet/column-io-benchmark.cc
+++ b/src/parquet/column-io-benchmark.cc
@@ -19,8 +19,8 @@
#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/writer-internal.h"
+#include "parquet/file_reader.h"
+#include "parquet/parquet_types.h"
#include "parquet/util/memory.h"
namespace parquet {
@@ -33,8 +33,8 @@
ColumnChunkMetaDataBuilder* metadata,
ColumnDescriptor* schema,
const WriterProperties* properties) {
- std::unique_ptr<SerializedPageWriter> pager(
- new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
+ std::unique_ptr<PageWriter> pager =
+ PageWriter::Open(dst, Compression::UNCOMPRESSED, metadata);
return std::unique_ptr<Int64Writer>(
new Int64Writer(metadata, std::move(pager), Encoding::PLAIN, properties));
}
@@ -110,8 +110,8 @@
std::unique_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,
int64_t num_values, ColumnDescriptor* schema) {
std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
- std::unique_ptr<SerializedPageReader> page_reader(
- new SerializedPageReader(std::move(source), num_values, Compression::UNCOMPRESSED));
+ std::unique_ptr<PageReader> page_reader =
+ PageReader::Open(std::move(source), num_values, Compression::UNCOMPRESSED);
return std::unique_ptr<Int64Reader>(new Int64Reader(schema, std::move(page_reader)));
}
diff --git a/src/parquet/column_page.h b/src/parquet/column_page.h
index 85e3bb5..c34eee7 100644
--- a/src/parquet/column_page.h
+++ b/src/parquet/column_page.h
@@ -168,35 +168,6 @@
bool is_sorted_;
};
-// Abstract page iterator interface. This way, we can feed column pages to the
-// ColumnReader through whatever mechanism we choose
-class PageReader {
- public:
- virtual ~PageReader() {}
-
- // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
- // containing new Page otherwise
- virtual std::shared_ptr<Page> NextPage() = 0;
-};
-
-class PageWriter {
- public:
- virtual ~PageWriter() {}
-
- // The Column Writer decides if dictionary encoding is used if set and
- // if the dictionary encoding has fallen back to default encoding on reaching dictionary
- // page limit
- virtual void Close(bool has_dictionary, bool fallback) = 0;
-
- virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;
-
- virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
-
- virtual bool has_compressor() = 0;
-
- virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0;
-};
-
} // namespace parquet
#endif // PARQUET_COLUMN_PAGE_H
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
index d23e738..91557af 100644
--- a/src/parquet/column_reader.cc
+++ b/src/parquet/column_reader.cc
@@ -24,11 +24,14 @@
#include <arrow/buffer.h>
#include <arrow/memory_pool.h>
#include <arrow/util/bit-util.h>
+#include <arrow/util/compression.h>
#include <arrow/util/rle-encoding.h>
#include "parquet/column_page.h"
#include "parquet/encoding-internal.h"
+#include "parquet/parquet_types.h"
#include "parquet/properties.h"
+#include "parquet/thrift.h"
using arrow::MemoryPool;
@@ -89,6 +92,177 @@
return default_reader_properties;
}
+// ----------------------------------------------------------------------
+// SerializedPageReader deserializes Thrift metadata and pages that have been
+// assembled in a serialized stream for storing in a Parquet files
+
+// This subclass delimits pages appearing in a serialized stream, each preceded
+// by a serialized Thrift format::PageHeader indicating the type of each page
+// and the page metadata.
+class SerializedPageReader : public PageReader {
+ public:
+ SerializedPageReader(std::unique_ptr<InputStream> stream, int64_t total_num_rows,
+ Compression::type codec, ::arrow::MemoryPool* pool)
+ : stream_(std::move(stream)),
+ decompression_buffer_(AllocateBuffer(pool, 0)),
+ seen_num_rows_(0),
+ total_num_rows_(total_num_rows) {
+ max_page_header_size_ = kDefaultMaxPageHeaderSize;
+ decompressor_ = GetCodecFromArrow(codec);
+ }
+
+ virtual ~SerializedPageReader() {}
+
+ // Implement the PageReader interface
+ std::shared_ptr<Page> NextPage() override;
+
+ void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }
+
+ private:
+ std::unique_ptr<InputStream> stream_;
+
+ format::PageHeader current_page_header_;
+ std::shared_ptr<Page> current_page_;
+
+ // Compression codec to use.
+ std::unique_ptr<::arrow::Codec> decompressor_;
+ std::shared_ptr<PoolBuffer> decompression_buffer_;
+
+ // Maximum allowed page size
+ uint32_t max_page_header_size_;
+
+ // Number of rows read in data pages so far
+ int64_t seen_num_rows_;
+
+ // Number of rows in all the data pages
+ int64_t total_num_rows_;
+};
+
+std::shared_ptr<Page> SerializedPageReader::NextPage() {
+ // Loop here because there may be unhandled page types that we skip until
+ // finding a page that we do know what to do with
+ while (seen_num_rows_ < total_num_rows_) {
+ int64_t bytes_read = 0;
+ int64_t bytes_available = 0;
+ uint32_t header_size = 0;
+ const uint8_t* buffer;
+ uint32_t allowed_page_size = kDefaultPageHeaderSize;
+
+ // Page headers can be very large because of page statistics
+ // We try to deserialize a larger buffer progressively
+ // until a maximum allowed header limit
+ while (true) {
+ buffer = stream_->Peek(allowed_page_size, &bytes_available);
+ if (bytes_available == 0) {
+ return std::shared_ptr<Page>(nullptr);
+ }
+
+ // This gets used, then set by DeserializeThriftMsg
+ header_size = static_cast<uint32_t>(bytes_available);
+ try {
+ DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_);
+ break;
+ } catch (std::exception& e) {
+ // Failed to deserialize. Double the allowed page header size and try again
+ std::stringstream ss;
+ ss << e.what();
+ allowed_page_size *= 2;
+ if (allowed_page_size > max_page_header_size_) {
+ ss << "Deserializing page header failed.\n";
+ throw ParquetException(ss.str());
+ }
+ }
+ }
+ // Advance the stream offset
+ stream_->Advance(header_size);
+
+ int compressed_len = current_page_header_.compressed_page_size;
+ int uncompressed_len = current_page_header_.uncompressed_page_size;
+
+ // Read the compressed data page.
+ buffer = stream_->Read(compressed_len, &bytes_read);
+ if (bytes_read != compressed_len) {
+ ParquetException::EofException();
+ }
+
+ // Uncompress it if we need to
+ if (decompressor_ != nullptr) {
+ // Grow the uncompressed buffer if we need to.
+ if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
+ PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));
+ }
+ PARQUET_THROW_NOT_OK(
+ decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
+ decompression_buffer_->mutable_data()));
+ buffer = decompression_buffer_->data();
+ }
+
+ auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len);
+
+ if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) {
+ const format::DictionaryPageHeader& dict_header =
+ current_page_header_.dictionary_page_header;
+
+ bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
+
+ return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values,
+ FromThrift(dict_header.encoding),
+ is_sorted);
+ } else if (current_page_header_.type == format::PageType::DATA_PAGE) {
+ const format::DataPageHeader& header = current_page_header_.data_page_header;
+
+ EncodedStatistics page_statistics;
+ if (header.__isset.statistics) {
+ const format::Statistics& stats = header.statistics;
+ if (stats.__isset.max) {
+ page_statistics.set_max(stats.max);
+ }
+ if (stats.__isset.min) {
+ page_statistics.set_min(stats.min);
+ }
+ if (stats.__isset.null_count) {
+ page_statistics.set_null_count(stats.null_count);
+ }
+ if (stats.__isset.distinct_count) {
+ page_statistics.set_distinct_count(stats.distinct_count);
+ }
+ }
+
+ seen_num_rows_ += header.num_values;
+
+ return std::make_shared<DataPage>(
+ page_buffer, header.num_values, FromThrift(header.encoding),
+ FromThrift(header.definition_level_encoding),
+ FromThrift(header.repetition_level_encoding), page_statistics);
+ } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) {
+ const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
+ bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false;
+
+ seen_num_rows_ += header.num_values;
+
+ return std::make_shared<DataPageV2>(
+ page_buffer, header.num_values, header.num_nulls, header.num_rows,
+ FromThrift(header.encoding), header.definition_levels_byte_length,
+ header.repetition_levels_byte_length, is_compressed);
+ } else {
+ // We don't know what this page type is. We're allowed to skip non-data
+ // pages.
+ continue;
+ }
+ }
+ return std::shared_ptr<Page>(nullptr);
+}
+
+std::unique_ptr<PageReader> PageReader::Open(std::unique_ptr<InputStream> stream,
+ int64_t total_num_rows,
+ Compression::type codec,
+ ::arrow::MemoryPool* pool) {
+ return std::unique_ptr<PageReader>(
+ new SerializedPageReader(std::move(stream), total_num_rows, codec, pool));
+}
+
+// ----------------------------------------------------------------------
+
ColumnReader::ColumnReader(const ColumnDescriptor* descr,
std::unique_ptr<PageReader> pager, MemoryPool* pool)
: descr_(descr),
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
index dcf41e8..6158cb3 100644
--- a/src/parquet/column_reader.h
+++ b/src/parquet/column_reader.h
@@ -49,6 +49,12 @@
namespace parquet {
+// 16 MB is the default maximum page header size
+static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
+
+// 16 KB is the default expected page header size
+static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
+
namespace BitUtil = ::arrow::BitUtil;
class PARQUET_EXPORT LevelDecoder {
@@ -72,6 +78,24 @@
std::unique_ptr<::arrow::BitReader> bit_packed_decoder_;
};
+// Abstract page iterator interface. This way, we can feed column pages to the
+// ColumnReader through whatever mechanism we choose
+class PARQUET_EXPORT PageReader {
+ public:
+ virtual ~PageReader() = default;
+
+ static std::unique_ptr<PageReader> Open(
+ std::unique_ptr<InputStream> stream, int64_t total_num_rows,
+ Compression::type codec,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+ // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
+ // containing new Page otherwise
+ virtual std::shared_ptr<Page> NextPage() = 0;
+
+ virtual void set_max_page_header_size(uint32_t size) = 0;
+};
+
class PARQUET_EXPORT ColumnReader {
public:
ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>,
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 681f022..b4e3232 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -19,8 +19,7 @@
#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/writer-internal.h"
+#include "parquet/parquet_types.h"
#include "parquet/test-specialization.h"
#include "parquet/test-util.h"
#include "parquet/types.h"
@@ -63,8 +62,8 @@
Compression::type compression = Compression::UNCOMPRESSED) {
auto buffer = sink_->GetBuffer();
std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
- std::unique_ptr<SerializedPageReader> page_reader(
- new SerializedPageReader(std::move(source), num_rows, compression));
+ std::unique_ptr<PageReader> page_reader =
+ PageReader::Open(std::move(source), num_rows, compression);
reader_.reset(new TypedColumnReader<TestType>(this->descr_, std::move(page_reader)));
}
@@ -74,8 +73,8 @@
sink_.reset(new InMemoryOutputStream());
metadata_ = ColumnChunkMetaDataBuilder::Make(
writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
- std::unique_ptr<SerializedPageWriter> pager(
- new SerializedPageWriter(sink_.get(), column_properties.codec, metadata_.get()));
+ std::unique_ptr<PageWriter> pager =
+ PageWriter::Open(sink_.get(), column_properties.codec, metadata_.get());
WriterProperties::Builder wp_builder;
if (column_properties.encoding == Encoding::PLAIN_DICTIONARY ||
column_properties.encoding == Encoding::RLE_DICTIONARY) {
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index 4aadf2b..bdaa9f6 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -17,12 +17,17 @@
#include "parquet/column_writer.h"
+#include <cstdint>
+#include <memory>
+
#include "arrow/util/bit-util.h"
+#include "arrow/util/compression.h"
#include "arrow/util/rle-encoding.h"
#include "parquet/encoding-internal.h"
#include "parquet/properties.h"
#include "parquet/statistics.h"
+#include "parquet/thrift.h"
#include "parquet/util/logging.h"
#include "parquet/util/memory.h"
@@ -104,6 +109,169 @@
}
// ----------------------------------------------------------------------
+// PageWriter implementation
+
+static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics) {
+ format::Statistics statistics;
+ if (row_group_statistics.has_min) statistics.__set_min(row_group_statistics.min());
+ if (row_group_statistics.has_max) statistics.__set_max(row_group_statistics.max());
+ if (row_group_statistics.has_null_count)
+ statistics.__set_null_count(row_group_statistics.null_count);
+ if (row_group_statistics.has_distinct_count)
+ statistics.__set_distinct_count(row_group_statistics.distinct_count);
+ return statistics;
+}
+
+// This subclass delimits pages appearing in a serialized stream, each preceded
+// by a serialized Thrift format::PageHeader indicating the type of each page
+// and the page metadata.
+class SerializedPageWriter : public PageWriter {
+ public:
+ SerializedPageWriter(OutputStream* sink, Compression::type codec,
+ ColumnChunkMetaDataBuilder* metadata,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+ : sink_(sink),
+ metadata_(metadata),
+ pool_(pool),
+ num_values_(0),
+ dictionary_page_offset_(0),
+ data_page_offset_(0),
+ total_uncompressed_size_(0),
+ total_compressed_size_(0) {
+ compressor_ = GetCodecFromArrow(codec);
+ }
+
+ virtual ~SerializedPageWriter() = default;
+
+ int64_t WriteDictionaryPage(const DictionaryPage& page) override {
+ int64_t uncompressed_size = page.size();
+ std::shared_ptr<Buffer> compressed_data = nullptr;
+ if (has_compressor()) {
+ auto buffer = std::static_pointer_cast<ResizableBuffer>(
+ AllocateBuffer(pool_, uncompressed_size));
+ Compress(*(page.buffer().get()), buffer.get());
+ compressed_data = std::static_pointer_cast<Buffer>(buffer);
+ } else {
+ compressed_data = page.buffer();
+ }
+
+ format::DictionaryPageHeader dict_page_header;
+ dict_page_header.__set_num_values(page.num_values());
+ dict_page_header.__set_encoding(ToThrift(page.encoding()));
+ dict_page_header.__set_is_sorted(page.is_sorted());
+
+ format::PageHeader page_header;
+ page_header.__set_type(format::PageType::DICTIONARY_PAGE);
+ page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
+ page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
+ page_header.__set_dictionary_page_header(dict_page_header);
+ // TODO(PARQUET-594) crc checksum
+
+ int64_t start_pos = sink_->Tell();
+ if (dictionary_page_offset_ == 0) {
+ dictionary_page_offset_ = start_pos;
+ }
+ int64_t header_size =
+ SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
+ sink_->Write(compressed_data->data(), compressed_data->size());
+
+ total_uncompressed_size_ += uncompressed_size + header_size;
+ total_compressed_size_ += compressed_data->size() + header_size;
+
+ return sink_->Tell() - start_pos;
+ }
+
+ void Close(bool has_dictionary, bool fallback) override {
+ // index_page_offset = 0 since they are not supported
+ metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_,
+ total_compressed_size_, total_uncompressed_size_, has_dictionary,
+ fallback);
+
+ // Write metadata at end of column chunk
+ metadata_->WriteTo(sink_);
+ }
+
+ /**
+ * Compress a buffer.
+ */
+ void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
+ DCHECK(compressor_ != nullptr);
+
+ // Compress the data
+ int64_t max_compressed_size =
+ compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data());
+
+ // Use Arrow::Buffer::shrink_to_fit = false
+ // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
+ PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false));
+
+ int64_t compressed_size;
+ PARQUET_THROW_NOT_OK(
+ compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size,
+ dest_buffer->mutable_data(), &compressed_size));
+ PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false));
+ }
+
+ int64_t WriteDataPage(const CompressedDataPage& page) override {
+ int64_t uncompressed_size = page.uncompressed_size();
+ std::shared_ptr<Buffer> compressed_data = page.buffer();
+
+ format::DataPageHeader data_page_header;
+ data_page_header.__set_num_values(page.num_values());
+ data_page_header.__set_encoding(ToThrift(page.encoding()));
+ data_page_header.__set_definition_level_encoding(
+ ToThrift(page.definition_level_encoding()));
+ data_page_header.__set_repetition_level_encoding(
+ ToThrift(page.repetition_level_encoding()));
+ data_page_header.__set_statistics(ToThrift(page.statistics()));
+
+ format::PageHeader page_header;
+ page_header.__set_type(format::PageType::DATA_PAGE);
+ page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
+ page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
+ page_header.__set_data_page_header(data_page_header);
+ // TODO(PARQUET-594) crc checksum
+
+ int64_t start_pos = sink_->Tell();
+ if (data_page_offset_ == 0) {
+ data_page_offset_ = start_pos;
+ }
+
+ int64_t header_size =
+ SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
+ sink_->Write(compressed_data->data(), compressed_data->size());
+
+ total_uncompressed_size_ += uncompressed_size + header_size;
+ total_compressed_size_ += compressed_data->size() + header_size;
+ num_values_ += page.num_values();
+
+ return sink_->Tell() - start_pos;
+ }
+
+ bool has_compressor() override { return (compressor_ != nullptr); }
+
+ private:
+ OutputStream* sink_;
+ ColumnChunkMetaDataBuilder* metadata_;
+ ::arrow::MemoryPool* pool_;
+ int64_t num_values_;
+ int64_t dictionary_page_offset_;
+ int64_t data_page_offset_;
+ int64_t total_uncompressed_size_;
+ int64_t total_compressed_size_;
+
+ // Compression codec to use.
+ std::unique_ptr<::arrow::Codec> compressor_;
+};
+
+std::unique_ptr<PageWriter> PageWriter::Open(OutputStream* sink, Compression::type codec,
+ ColumnChunkMetaDataBuilder* metadata,
+ ::arrow::MemoryPool* pool) {
+ return std::unique_ptr<PageWriter>(
+ new SerializedPageWriter(sink, codec, metadata, pool));
+}
+
+// ----------------------------------------------------------------------
// ColumnWriter
std::shared_ptr<WriterProperties> default_writer_properties() {
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 0408747..f1c13a0 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -22,7 +22,7 @@
#include "parquet/column_page.h"
#include "parquet/encoding.h"
-#include "parquet/file/metadata.h"
+#include "parquet/metadata.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/statistics.h"
@@ -69,6 +69,28 @@
std::unique_ptr<::arrow::BitWriter> bit_packed_encoder_;
};
+class PageWriter {
+ public:
+ virtual ~PageWriter() {}
+
+ static std::unique_ptr<PageWriter> Open(
+ OutputStream* sink, Compression::type codec, ColumnChunkMetaDataBuilder* metadata,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+ // The Column Writer decides if dictionary encoding is used if set and
+ // if the dictionary encoding has fallen back to default encoding on reaching dictionary
+ // page limit
+ virtual void Close(bool has_dictionary, bool fallback) = 0;
+
+ virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;
+
+ virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
+
+ virtual bool has_compressor() = 0;
+
+ virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0;
+};
+
static constexpr int WRITE_BATCH_SIZE = 1000;
class PARQUET_EXPORT ColumnWriter {
public:
diff --git a/src/parquet/encoding-benchmark.cc b/src/parquet/encoding-benchmark.cc
index 97eeefa..72c41e5 100644
--- a/src/parquet/encoding-benchmark.cc
+++ b/src/parquet/encoding-benchmark.cc
@@ -18,7 +18,6 @@
#include "benchmark/benchmark.h"
#include "parquet/encoding-internal.h"
-#include "parquet/file/reader-internal.h"
#include "parquet/util/memory.h"
using arrow::default_memory_pool;
@@ -26,7 +25,6 @@
namespace parquet {
-using format::ColumnChunk;
using schema::PrimitiveNode;
namespace benchmark {
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file-deserialize-test.cc
similarity index 96%
rename from src/parquet/file/file-deserialize-test.cc
rename to src/parquet/file-deserialize-test.cc
index 0cab75f..5e17375 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file-deserialize-test.cc
@@ -26,9 +26,9 @@
#include <string>
#include <vector>
-#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
#include "parquet/exception.h"
-#include "parquet/file/reader-internal.h"
+#include "parquet/file_reader.h"
#include "parquet/parquet_types.h"
#include "parquet/thrift.h"
#include "parquet/types.h"
@@ -73,7 +73,7 @@
EndStream();
std::unique_ptr<InputStream> stream;
stream.reset(new InMemoryInputStream(out_buffer_));
- page_reader_.reset(new SerializedPageReader(std::move(stream), num_rows, codec));
+ page_reader_ = PageReader::Open(std::move(stream), num_rows, codec);
}
void WriteDataPageHeader(int max_serialized_len = 1024, int32_t uncompressed_size = 0,
@@ -99,7 +99,7 @@
std::unique_ptr<InMemoryOutputStream> out_stream_;
std::shared_ptr<Buffer> out_buffer_;
- std::unique_ptr<SerializedPageReader> page_reader_;
+ std::unique_ptr<PageReader> page_reader_;
format::PageHeader page_header_;
format::DataPageHeader data_page_header_;
};
@@ -149,7 +149,7 @@
// check header size is between 256 KB to 16 MB
ASSERT_LE(stats_size, out_stream_->Tell());
- ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, out_stream_->Tell());
+ ASSERT_GE(kDefaultMaxPageHeaderSize, out_stream_->Tell());
InitSerializedPageReader(num_rows);
std::shared_ptr<Page> current_page = page_reader_->NextPage();
@@ -249,7 +249,7 @@
auto reader = std::make_shared<BufferReader>(buffer);
auto wrapper = std::unique_ptr<ArrowInputFile>(new ArrowInputFile(reader));
- ASSERT_THROW(reader_->Open(SerializedFile::Open(std::move(wrapper))),
+ ASSERT_THROW(reader_->Open(ParquetFileReader::Contents::Open(std::move(wrapper))),
ParquetException);
}
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file-serialize-test.cc
similarity index 98%
rename from src/parquet/file/file-serialize-test.cc
rename to src/parquet/file-serialize-test.cc
index 4d94d2e..b4df77e 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file-serialize-test.cc
@@ -19,8 +19,8 @@
#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
-#include "parquet/file/reader.h"
-#include "parquet/file/writer.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
#include "parquet/test-specialization.h"
#include "parquet/test-util.h"
#include "parquet/types.h"
diff --git a/src/parquet/file/CMakeLists.txt b/src/parquet/file/CMakeLists.txt
deleted file mode 100644
index 82e7c80..0000000
--- a/src/parquet/file/CMakeLists.txt
+++ /dev/null
@@ -1,27 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-install(FILES
- metadata.h
- printer.h
- reader.h
- writer.h
- DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/parquet/file")
-
-ADD_PARQUET_TEST(file-deserialize-test)
-ADD_PARQUET_TEST(file-metadata-test)
-ADD_PARQUET_TEST(file-serialize-test)
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
deleted file mode 100644
index bc14ec9..0000000
--- a/src/parquet/file/reader-internal.cc
+++ /dev/null
@@ -1,309 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/file/reader-internal.h"
-
-#include <string.h>
-#include <algorithm>
-#include <exception>
-#include <ostream>
-#include <string>
-#include <vector>
-
-#include "arrow/util/compression.h"
-
-#include "parquet/column_page.h"
-#include "parquet/exception.h"
-#include "parquet/schema.h"
-#include "parquet/thrift.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-
-using arrow::MemoryPool;
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// SerializedPageReader deserializes Thrift metadata and pages that have been
-// assembled in a serialized stream for storing in a Parquet files
-
-SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
- int64_t total_num_rows,
- Compression::type codec, MemoryPool* pool)
- : stream_(std::move(stream)),
- decompression_buffer_(AllocateBuffer(pool, 0)),
- seen_num_rows_(0),
- total_num_rows_(total_num_rows) {
- max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
- decompressor_ = GetCodecFromArrow(codec);
-}
-
-std::shared_ptr<Page> SerializedPageReader::NextPage() {
- // Loop here because there may be unhandled page types that we skip until
- // finding a page that we do know what to do with
- while (seen_num_rows_ < total_num_rows_) {
- int64_t bytes_read = 0;
- int64_t bytes_available = 0;
- uint32_t header_size = 0;
- const uint8_t* buffer;
- uint32_t allowed_page_size = DEFAULT_PAGE_HEADER_SIZE;
-
- // Page headers can be very large because of page statistics
- // We try to deserialize a larger buffer progressively
- // until a maximum allowed header limit
- while (true) {
- buffer = stream_->Peek(allowed_page_size, &bytes_available);
- if (bytes_available == 0) {
- return std::shared_ptr<Page>(nullptr);
- }
-
- // This gets used, then set by DeserializeThriftMsg
- header_size = static_cast<uint32_t>(bytes_available);
- try {
- DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_);
- break;
- } catch (std::exception& e) {
- // Failed to deserialize. Double the allowed page header size and try again
- std::stringstream ss;
- ss << e.what();
- allowed_page_size *= 2;
- if (allowed_page_size > max_page_header_size_) {
- ss << "Deserializing page header failed.\n";
- throw ParquetException(ss.str());
- }
- }
- }
- // Advance the stream offset
- stream_->Advance(header_size);
-
- int compressed_len = current_page_header_.compressed_page_size;
- int uncompressed_len = current_page_header_.uncompressed_page_size;
-
- // Read the compressed data page.
- buffer = stream_->Read(compressed_len, &bytes_read);
- if (bytes_read != compressed_len) {
- ParquetException::EofException();
- }
-
- // Uncompress it if we need to
- if (decompressor_ != nullptr) {
- // Grow the uncompressed buffer if we need to.
- if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
- PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));
- }
- PARQUET_THROW_NOT_OK(
- decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
- decompression_buffer_->mutable_data()));
- buffer = decompression_buffer_->data();
- }
-
- auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len);
-
- if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) {
- const format::DictionaryPageHeader& dict_header =
- current_page_header_.dictionary_page_header;
-
- bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
-
- return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values,
- FromThrift(dict_header.encoding),
- is_sorted);
- } else if (current_page_header_.type == format::PageType::DATA_PAGE) {
- const format::DataPageHeader& header = current_page_header_.data_page_header;
-
- EncodedStatistics page_statistics;
- if (header.__isset.statistics) {
- const format::Statistics& stats = header.statistics;
- if (stats.__isset.max) {
- page_statistics.set_max(stats.max);
- }
- if (stats.__isset.min) {
- page_statistics.set_min(stats.min);
- }
- if (stats.__isset.null_count) {
- page_statistics.set_null_count(stats.null_count);
- }
- if (stats.__isset.distinct_count) {
- page_statistics.set_distinct_count(stats.distinct_count);
- }
- }
-
- seen_num_rows_ += header.num_values;
-
- return std::make_shared<DataPage>(
- page_buffer, header.num_values, FromThrift(header.encoding),
- FromThrift(header.definition_level_encoding),
- FromThrift(header.repetition_level_encoding), page_statistics);
- } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) {
- const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
- bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false;
-
- seen_num_rows_ += header.num_values;
-
- return std::make_shared<DataPageV2>(
- page_buffer, header.num_values, header.num_nulls, header.num_rows,
- FromThrift(header.encoding), header.definition_levels_byte_length,
- header.repetition_levels_byte_length, is_compressed);
- } else {
- // We don't know what this page type is. We're allowed to skip non-data
- // pages.
- continue;
- }
- }
- return std::shared_ptr<Page>(nullptr);
-}
-
-SerializedRowGroup::SerializedRowGroup(RandomAccessSource* source,
- FileMetaData* file_metadata, int row_group_number,
- const ReaderProperties& props)
- : source_(source), file_metadata_(file_metadata), properties_(props) {
- row_group_metadata_ = file_metadata->RowGroup(row_group_number);
-}
-const RowGroupMetaData* SerializedRowGroup::metadata() const {
- return row_group_metadata_.get();
-}
-
-const ReaderProperties* SerializedRowGroup::properties() const { return &properties_; }
-
-// For PARQUET-816
-static constexpr int64_t kMaxDictHeaderSize = 100;
-
-std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) {
- // Read column chunk from the file
- auto col = row_group_metadata_->ColumnChunk(i);
-
- int64_t col_start = col->data_page_offset();
- if (col->has_dictionary_page() && col_start > col->dictionary_page_offset()) {
- col_start = col->dictionary_page_offset();
- }
-
- int64_t col_length = col->total_compressed_size();
- std::unique_ptr<InputStream> stream;
-
- // PARQUET-816 workaround for old files created by older parquet-mr
- const ApplicationVersion& version = file_metadata_->writer_version();
- if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION)) {
- // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
- // dictionary page header size in total_compressed_size and total_uncompressed_size
- // (see IMPALA-694). We add padding to compensate.
- int64_t bytes_remaining = source_->Size() - (col_start + col_length);
- int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining);
- col_length += padding;
- }
-
- stream = properties_.GetStream(source_, col_start, col_length);
-
- return std::unique_ptr<PageReader>(
- new SerializedPageReader(std::move(stream), col->num_values(), col->compression(),
- properties_.memory_pool()));
-}
-
-// ----------------------------------------------------------------------
-// SerializedFile: Parquet on-disk layout
-
-// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
-static constexpr int64_t DEFAULT_FOOTER_READ_SIZE = 64 * 1024;
-static constexpr uint32_t FOOTER_SIZE = 8;
-static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
-
-std::unique_ptr<ParquetFileReader::Contents> SerializedFile::Open(
- std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
- const std::shared_ptr<FileMetaData>& metadata) {
- std::unique_ptr<ParquetFileReader::Contents> result(
- new SerializedFile(std::move(source), props));
-
- // Access private methods here, but otherwise unavailable
- SerializedFile* file = static_cast<SerializedFile*>(result.get());
-
- if (metadata == nullptr) {
- // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor
- file->ParseMetaData();
- } else {
- file->file_metadata_ = metadata;
- }
-
- return result;
-}
-
-void SerializedFile::Close() { source_->Close(); }
-
-SerializedFile::~SerializedFile() {
- try {
- Close();
- } catch (...) {
- }
-}
-
-std::shared_ptr<RowGroupReader> SerializedFile::GetRowGroup(int i) {
- std::unique_ptr<SerializedRowGroup> contents(
- new SerializedRowGroup(source_.get(), file_metadata_.get(), i, properties_));
- return std::make_shared<RowGroupReader>(std::move(contents));
-}
-
-std::shared_ptr<FileMetaData> SerializedFile::metadata() const { return file_metadata_; }
-
-SerializedFile::SerializedFile(
- std::unique_ptr<RandomAccessSource> source,
- const ReaderProperties& props = default_reader_properties())
- : source_(std::move(source)), properties_(props) {}
-
-void SerializedFile::ParseMetaData() {
- int64_t file_size = source_->Size();
-
- if (file_size < FOOTER_SIZE) {
- throw ParquetException("Corrupted file, smaller than file footer");
- }
-
- uint8_t footer_buffer[DEFAULT_FOOTER_READ_SIZE];
- int64_t footer_read_size = std::min(file_size, DEFAULT_FOOTER_READ_SIZE);
- int64_t bytes_read =
- source_->ReadAt(file_size - footer_read_size, footer_read_size, footer_buffer);
-
- // Check if all bytes are read. Check if last 4 bytes read have the magic bits
- if (bytes_read != footer_read_size ||
- memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) != 0) {
- throw ParquetException("Invalid parquet file. Corrupt footer.");
- }
-
- uint32_t metadata_len =
- *reinterpret_cast<uint32_t*>(footer_buffer + footer_read_size - FOOTER_SIZE);
- int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len;
- if (FOOTER_SIZE + metadata_len > file_size) {
- throw ParquetException(
- "Invalid parquet file. File is less than "
- "file metadata size.");
- }
-
- std::shared_ptr<PoolBuffer> metadata_buffer =
- AllocateBuffer(properties_.memory_pool(), metadata_len);
-
- // Check if the footer_buffer contains the entire metadata
- if (footer_read_size >= (metadata_len + FOOTER_SIZE)) {
- memcpy(metadata_buffer->mutable_data(),
- footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE), metadata_len);
- } else {
- bytes_read =
- source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data());
- if (bytes_read != metadata_len) {
- throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
- }
- }
-
- file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len);
-}
-
-} // namespace parquet
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
deleted file mode 100644
index 282c534..0000000
--- a/src/parquet/file/reader-internal.h
+++ /dev/null
@@ -1,133 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_FILE_READER_INTERNAL_H
-#define PARQUET_FILE_READER_INTERNAL_H
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "parquet/column_page.h"
-#include "parquet/file/metadata.h"
-#include "parquet/file/reader.h"
-#include "parquet/parquet_types.h"
-#include "parquet/properties.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/visibility.h"
-
-namespace arrow {
-
-class Codec;
-};
-
-namespace parquet {
-
-// 16 MB is the default maximum page header size
-static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024;
-
-// 16 KB is the default expected page header size
-static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024;
-
-// This subclass delimits pages appearing in a serialized stream, each preceded
-// by a serialized Thrift format::PageHeader indicating the type of each page
-// and the page metadata.
-class PARQUET_EXPORT SerializedPageReader : public PageReader {
- public:
- SerializedPageReader(std::unique_ptr<InputStream> stream, int64_t num_rows,
- Compression::type codec,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
-
- virtual ~SerializedPageReader() {}
-
- // Implement the PageReader interface
- virtual std::shared_ptr<Page> NextPage();
-
- void set_max_page_header_size(uint32_t size) { max_page_header_size_ = size; }
-
- private:
- std::unique_ptr<InputStream> stream_;
-
- format::PageHeader current_page_header_;
- std::shared_ptr<Page> current_page_;
-
- // Compression codec to use.
- std::unique_ptr<::arrow::Codec> decompressor_;
- std::shared_ptr<PoolBuffer> decompression_buffer_;
-
- // Maximum allowed page size
- uint32_t max_page_header_size_;
-
- // Number of rows read in data pages so far
- int64_t seen_num_rows_;
-
- // Number of rows in all the data pages
- int64_t total_num_rows_;
-};
-
-// RowGroupReader::Contents implementation for the Parquet file specification
-class PARQUET_EXPORT SerializedRowGroup : public RowGroupReader::Contents {
- public:
- SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata,
- int row_group_number, const ReaderProperties& props);
-
- virtual const RowGroupMetaData* metadata() const;
-
- virtual const ReaderProperties* properties() const;
-
- virtual std::unique_ptr<PageReader> GetColumnPageReader(int i);
-
- private:
- RandomAccessSource* source_;
- FileMetaData* file_metadata_;
- std::unique_ptr<RowGroupMetaData> row_group_metadata_;
- ReaderProperties properties_;
-};
-
-// An implementation of ParquetFileReader::Contents that deals with the Parquet
-// file structure, Thrift deserialization, and other internal matters
-
-class PARQUET_EXPORT SerializedFile : public ParquetFileReader::Contents {
- public:
- // Open the file. If no metadata is passed, it is parsed from the footer of
- // the file
- static std::unique_ptr<ParquetFileReader::Contents> Open(
- std::unique_ptr<RandomAccessSource> source,
- const ReaderProperties& props = default_reader_properties(),
- const std::shared_ptr<FileMetaData>& metadata = nullptr);
-
- void Close() override;
- std::shared_ptr<RowGroupReader> GetRowGroup(int i) override;
- std::shared_ptr<FileMetaData> metadata() const override;
- virtual ~SerializedFile();
-
- private:
- // This class takes ownership of the provided data source
- explicit SerializedFile(std::unique_ptr<RandomAccessSource> source,
- const ReaderProperties& props);
-
- std::unique_ptr<RandomAccessSource> source_;
- std::shared_ptr<FileMetaData> file_metadata_;
- ReaderProperties properties_;
-
- void ParseMetaData();
-};
-
-} // namespace parquet
-
-#endif // PARQUET_FILE_READER_INTERNAL_H
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
deleted file mode 100644
index 9b9bde9..0000000
--- a/src/parquet/file/reader.cc
+++ /dev/null
@@ -1,192 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/file/reader.h"
-
-#include <cstdio>
-#include <memory>
-#include <sstream>
-#include <string>
-#include <utility>
-
-#include "arrow/io/file.h"
-
-#include "parquet/column_page.h"
-#include "parquet/column_reader.h"
-#include "parquet/column_scanner.h"
-#include "parquet/exception.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/types.h"
-#include "parquet/util/logging.h"
-#include "parquet/util/memory.h"
-
-using std::string;
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// RowGroupReader public API
-
-RowGroupReader::RowGroupReader(std::unique_ptr<Contents> contents)
- : contents_(std::move(contents)) {}
-
-std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
- DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
- << metadata()->num_columns()
- << "columns, requested column: " << i;
- const ColumnDescriptor* descr = metadata()->schema()->Column(i);
-
- std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
- return ColumnReader::Make(
- descr, std::move(page_reader),
- const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
-}
-
-std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
- DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
- << metadata()->num_columns()
- << "columns, requested column: " << i;
- return contents_->GetColumnPageReader(i);
-}
-
-// Returns the rowgroup metadata
-const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); }
-
-// ----------------------------------------------------------------------
-// ParquetFileReader public API
-
-ParquetFileReader::ParquetFileReader() {}
-ParquetFileReader::~ParquetFileReader() {
- try {
- Close();
- } catch (...) {
- }
-}
-
-std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
- const std::shared_ptr<::arrow::io::ReadableFileInterface>& source,
- const ReaderProperties& props, const std::shared_ptr<FileMetaData>& metadata) {
- std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(source));
- return Open(std::move(io_wrapper), props, metadata);
-}
-
-std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
- std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
- const std::shared_ptr<FileMetaData>& metadata) {
- auto contents = SerializedFile::Open(std::move(source), props, metadata);
- std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
- result->Open(std::move(contents));
- return result;
-}
-
-std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
- const std::string& path, bool memory_map, const ReaderProperties& props,
- const std::shared_ptr<FileMetaData>& metadata) {
- std::shared_ptr<::arrow::io::ReadableFileInterface> source;
- if (memory_map) {
- std::shared_ptr<::arrow::io::ReadableFile> handle;
- PARQUET_THROW_NOT_OK(
- ::arrow::io::ReadableFile::Open(path, props.memory_pool(), &handle));
- source = handle;
- } else {
- std::shared_ptr<::arrow::io::MemoryMappedFile> handle;
- PARQUET_THROW_NOT_OK(
- ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ, &handle));
- source = handle;
- }
-
- return Open(source, props, metadata);
-}
-
-void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) {
- contents_ = std::move(contents);
-}
-
-void ParquetFileReader::Close() {
- if (contents_) {
- contents_->Close();
- }
-}
-
-std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const {
- return contents_->metadata();
-}
-
-std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
- DCHECK(i < metadata()->num_row_groups()) << "The file only has "
- << metadata()->num_row_groups()
- << "row groups, requested reader for: " << i;
- return contents_->GetRowGroup(i);
-}
-
-// ----------------------------------------------------------------------
-// File metadata helpers
-
-std::shared_ptr<FileMetaData> ReadMetaData(
- const std::shared_ptr<::arrow::io::ReadableFileInterface>& source) {
- return ParquetFileReader::Open(source)->metadata();
-}
-
-// ----------------------------------------------------------------------
-// File scanner for performance testing
-
-int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size,
- ParquetFileReader* reader) {
- std::vector<int16_t> rep_levels(column_batch_size);
- std::vector<int16_t> def_levels(column_batch_size);
-
- int num_columns = static_cast<int>(columns.size());
-
- // columns are not specified explicitly. Add all columns
- if (columns.size() == 0) {
- num_columns = reader->metadata()->num_columns();
- columns.resize(num_columns);
- for (int i = 0; i < num_columns; i++) {
- columns[i] = i;
- }
- }
-
- std::vector<int64_t> total_rows(num_columns, 0);
-
- for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) {
- auto group_reader = reader->RowGroup(r);
- int col = 0;
- for (auto i : columns) {
- std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
- size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type());
- std::vector<uint8_t> values(column_batch_size * value_byte_size);
-
- int64_t values_read = 0;
- while (col_reader->HasNext()) {
- total_rows[col] +=
- ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(),
- values.data(), &values_read, col_reader.get());
- }
- col++;
- }
- }
-
- for (int i = 1; i < num_columns; ++i) {
- if (total_rows[0] != total_rows[i]) {
- throw ParquetException("Parquet error: Total rows among columns do not match");
- }
- }
-
- return total_rows[0];
-}
-
-} // namespace parquet
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
deleted file mode 100644
index 712289b..0000000
--- a/src/parquet/file/writer-internal.cc
+++ /dev/null
@@ -1,327 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/file/writer-internal.h"
-
-#include <cstdint>
-#include <memory>
-
-#include "arrow/util/compression.h"
-
-#include "parquet/column_writer.h"
-#include "parquet/schema-internal.h"
-#include "parquet/schema.h"
-#include "parquet/thrift.h"
-#include "parquet/util/memory.h"
-
-using arrow::MemoryPool;
-
-using parquet::schema::GroupNode;
-using parquet::schema::SchemaFlattener;
-
-namespace parquet {
-
-// FIXME: copied from reader-internal.cc
-static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
-
-// ----------------------------------------------------------------------
-// SerializedPageWriter
-
-SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type codec,
- ColumnChunkMetaDataBuilder* metadata,
- MemoryPool* pool)
- : sink_(sink),
- metadata_(metadata),
- pool_(pool),
- num_values_(0),
- dictionary_page_offset_(0),
- data_page_offset_(0),
- total_uncompressed_size_(0),
- total_compressed_size_(0) {
- compressor_ = GetCodecFromArrow(codec);
-}
-
-static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics) {
- format::Statistics statistics;
- if (row_group_statistics.has_min) statistics.__set_min(row_group_statistics.min());
- if (row_group_statistics.has_max) statistics.__set_max(row_group_statistics.max());
- if (row_group_statistics.has_null_count)
- statistics.__set_null_count(row_group_statistics.null_count);
- if (row_group_statistics.has_distinct_count)
- statistics.__set_distinct_count(row_group_statistics.distinct_count);
- return statistics;
-}
-
-void SerializedPageWriter::Close(bool has_dictionary, bool fallback) {
- // index_page_offset = 0 since they are not supported
- metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_,
- total_compressed_size_, total_uncompressed_size_, has_dictionary,
- fallback);
-
- // Write metadata at end of column chunk
- metadata_->WriteTo(sink_);
-}
-
-void SerializedPageWriter::Compress(const Buffer& src_buffer,
- ResizableBuffer* dest_buffer) {
- DCHECK(compressor_ != nullptr);
-
- // Compress the data
- int64_t max_compressed_size =
- compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data());
-
- // Use Arrow::Buffer::shrink_to_fit = false
- // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
- PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false));
-
- int64_t compressed_size;
- PARQUET_THROW_NOT_OK(
- compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size,
- dest_buffer->mutable_data(), &compressed_size));
- PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false));
-}
-
-int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) {
- int64_t uncompressed_size = page.uncompressed_size();
- std::shared_ptr<Buffer> compressed_data = page.buffer();
-
- format::DataPageHeader data_page_header;
- data_page_header.__set_num_values(page.num_values());
- data_page_header.__set_encoding(ToThrift(page.encoding()));
- data_page_header.__set_definition_level_encoding(
- ToThrift(page.definition_level_encoding()));
- data_page_header.__set_repetition_level_encoding(
- ToThrift(page.repetition_level_encoding()));
- data_page_header.__set_statistics(ToThrift(page.statistics()));
-
- format::PageHeader page_header;
- page_header.__set_type(format::PageType::DATA_PAGE);
- page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
- page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
- page_header.__set_data_page_header(data_page_header);
- // TODO(PARQUET-594) crc checksum
-
- int64_t start_pos = sink_->Tell();
- if (data_page_offset_ == 0) {
- data_page_offset_ = start_pos;
- }
-
- int64_t header_size =
- SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
- sink_->Write(compressed_data->data(), compressed_data->size());
-
- total_uncompressed_size_ += uncompressed_size + header_size;
- total_compressed_size_ += compressed_data->size() + header_size;
- num_values_ += page.num_values();
-
- return sink_->Tell() - start_pos;
-}
-
-int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) {
- int64_t uncompressed_size = page.size();
- std::shared_ptr<Buffer> compressed_data = nullptr;
- if (has_compressor()) {
- auto buffer = std::static_pointer_cast<ResizableBuffer>(
- AllocateBuffer(pool_, uncompressed_size));
- Compress(*(page.buffer().get()), buffer.get());
- compressed_data = std::static_pointer_cast<Buffer>(buffer);
- } else {
- compressed_data = page.buffer();
- }
-
- format::DictionaryPageHeader dict_page_header;
- dict_page_header.__set_num_values(page.num_values());
- dict_page_header.__set_encoding(ToThrift(page.encoding()));
- dict_page_header.__set_is_sorted(page.is_sorted());
-
- format::PageHeader page_header;
- page_header.__set_type(format::PageType::DICTIONARY_PAGE);
- page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
- page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
- page_header.__set_dictionary_page_header(dict_page_header);
- // TODO(PARQUET-594) crc checksum
-
- int64_t start_pos = sink_->Tell();
- if (dictionary_page_offset_ == 0) {
- dictionary_page_offset_ = start_pos;
- }
- int64_t header_size =
- SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
- sink_->Write(compressed_data->data(), compressed_data->size());
-
- total_uncompressed_size_ += uncompressed_size + header_size;
- total_compressed_size_ += compressed_data->size() + header_size;
-
- return sink_->Tell() - start_pos;
-}
-
-// ----------------------------------------------------------------------
-// RowGroupSerializer
-
-int RowGroupSerializer::num_columns() const { return metadata_->num_columns(); }
-
-int64_t RowGroupSerializer::num_rows() const {
- if (current_column_writer_) {
- CheckRowsWritten();
- }
- return num_rows_ < 0 ? 0 : num_rows_;
-}
-
-void RowGroupSerializer::CheckRowsWritten() const {
- int64_t current_rows = current_column_writer_->rows_written();
- if (num_rows_ < 0) {
- num_rows_ = current_rows;
- metadata_->set_num_rows(current_rows);
- } else if (num_rows_ != current_rows) {
- std::stringstream ss;
- ss << "Column " << current_column_index_ << " had " << current_rows
- << " while previous column had " << num_rows_;
- throw ParquetException(ss.str());
- }
-}
-
-ColumnWriter* RowGroupSerializer::NextColumn() {
- if (current_column_writer_) {
- CheckRowsWritten();
- }
-
- // Throws an error if more columns are being written
- auto col_meta = metadata_->NextColumnChunk();
-
- if (current_column_writer_) {
- total_bytes_written_ += current_column_writer_->Close();
- }
-
- ++current_column_index_;
-
- const ColumnDescriptor* column_descr = col_meta->descr();
- std::unique_ptr<PageWriter> pager(
- new SerializedPageWriter(sink_, properties_->compression(column_descr->path()),
- col_meta, properties_->memory_pool()));
- current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), properties_);
- return current_column_writer_.get();
-}
-
-int RowGroupSerializer::current_column() const { return metadata_->current_column(); }
-
-void RowGroupSerializer::Close() {
- if (!closed_) {
- closed_ = true;
-
- if (current_column_writer_) {
- CheckRowsWritten();
- total_bytes_written_ += current_column_writer_->Close();
- current_column_writer_.reset();
- }
-
- // Ensures all columns have been written
- metadata_->Finish(total_bytes_written_);
- }
-}
-
-// ----------------------------------------------------------------------
-// FileSerializer
-
-std::unique_ptr<ParquetFileWriter::Contents> FileSerializer::Open(
- const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema,
- const std::shared_ptr<WriterProperties>& properties,
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
- std::unique_ptr<ParquetFileWriter::Contents> result(
- new FileSerializer(sink, schema, properties, key_value_metadata));
-
- return result;
-}
-
-void FileSerializer::Close() {
- if (is_open_) {
- if (row_group_writer_) {
- num_rows_ += row_group_writer_->num_rows();
- row_group_writer_->Close();
- }
- row_group_writer_.reset();
-
- // Write magic bytes and metadata
- WriteMetaData();
-
- sink_->Close();
- is_open_ = false;
- }
-}
-
-int FileSerializer::num_columns() const { return schema_.num_columns(); }
-
-int FileSerializer::num_row_groups() const { return num_row_groups_; }
-
-int64_t FileSerializer::num_rows() const { return num_rows_; }
-
-const std::shared_ptr<WriterProperties>& FileSerializer::properties() const {
- return properties_;
-}
-
-RowGroupWriter* FileSerializer::AppendRowGroup() {
- if (row_group_writer_) {
- row_group_writer_->Close();
- }
- num_row_groups_++;
- auto rg_metadata = metadata_->AppendRowGroup();
- std::unique_ptr<RowGroupWriter::Contents> contents(
- new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get()));
- row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
- return row_group_writer_.get();
-}
-
-FileSerializer::~FileSerializer() {
- try {
- Close();
- } catch (...) {
- }
-}
-
-void FileSerializer::WriteMetaData() {
- // Write MetaData
- uint32_t metadata_len = static_cast<uint32_t>(sink_->Tell());
-
- // Get a FileMetaData
- auto metadata = metadata_->Finish();
- metadata->WriteTo(sink_.get());
- metadata_len = static_cast<uint32_t>(sink_->Tell()) - metadata_len;
-
- // Write Footer
- sink_->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
- sink_->Write(PARQUET_MAGIC, 4);
-}
-
-FileSerializer::FileSerializer(
- const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema,
- const std::shared_ptr<WriterProperties>& properties,
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata)
- : ParquetFileWriter::Contents(schema, key_value_metadata),
- sink_(sink),
- is_open_(true),
- properties_(properties),
- num_row_groups_(0),
- num_rows_(0),
- metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) {
- StartFile();
-}
-
-void FileSerializer::StartFile() {
- // Parquet files always start with PAR1
- sink_->Write(PARQUET_MAGIC, 4);
-}
-
-} // namespace parquet
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
deleted file mode 100644
index 3cd73fe..0000000
--- a/src/parquet/file/writer-internal.h
+++ /dev/null
@@ -1,153 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_FILE_WRITER_INTERNAL_H
-#define PARQUET_FILE_WRITER_INTERNAL_H
-
-#include <memory>
-#include <vector>
-
-#include "parquet/column_page.h"
-#include "parquet/file/metadata.h"
-#include "parquet/file/writer.h"
-#include "parquet/parquet_types.h"
-#include "parquet/util/memory.h"
-
-namespace arrow {
-
-class Codec;
-};
-
-namespace parquet {
-
-// This subclass delimits pages appearing in a serialized stream, each preceded
-// by a serialized Thrift format::PageHeader indicating the type of each page
-// and the page metadata.
-class SerializedPageWriter : public PageWriter {
- public:
- SerializedPageWriter(OutputStream* sink, Compression::type codec,
- ColumnChunkMetaDataBuilder* metadata,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
-
- virtual ~SerializedPageWriter() {}
-
- int64_t WriteDataPage(const CompressedDataPage& page) override;
-
- int64_t WriteDictionaryPage(const DictionaryPage& page) override;
-
- /**
- * Compress a buffer.
- */
- void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override;
-
- bool has_compressor() override { return (compressor_ != nullptr); }
-
- void Close(bool has_dictionary, bool fallback) override;
-
- private:
- OutputStream* sink_;
- ColumnChunkMetaDataBuilder* metadata_;
- ::arrow::MemoryPool* pool_;
- int64_t num_values_;
- int64_t dictionary_page_offset_;
- int64_t data_page_offset_;
- int64_t total_uncompressed_size_;
- int64_t total_compressed_size_;
-
- // Compression codec to use.
- std::unique_ptr<::arrow::Codec> compressor_;
-};
-
-// RowGroupWriter::Contents implementation for the Parquet file specification
-class RowGroupSerializer : public RowGroupWriter::Contents {
- public:
- RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
- const WriterProperties* properties)
- : sink_(sink),
- metadata_(metadata),
- properties_(properties),
- total_bytes_written_(0),
- closed_(false),
- current_column_index_(0),
- num_rows_(-1) {}
-
- int num_columns() const override;
- int64_t num_rows() const override;
-
- ColumnWriter* NextColumn() override;
- int current_column() const override;
- void Close() override;
-
- private:
- OutputStream* sink_;
- mutable RowGroupMetaDataBuilder* metadata_;
- const WriterProperties* properties_;
- int64_t total_bytes_written_;
- bool closed_;
- int current_column_index_;
- mutable int64_t num_rows_;
-
- void CheckRowsWritten() const;
-
- std::shared_ptr<ColumnWriter> current_column_writer_;
-};
-
-// An implementation of ParquetFileWriter::Contents that deals with the Parquet
-// file structure, Thrift serialization, and other internal matters
-
-class FileSerializer : public ParquetFileWriter::Contents {
- public:
- static std::unique_ptr<ParquetFileWriter::Contents> Open(
- const std::shared_ptr<OutputStream>& sink,
- const std::shared_ptr<schema::GroupNode>& schema,
- const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = nullptr);
-
- void Close() override;
-
- RowGroupWriter* AppendRowGroup() override;
-
- const std::shared_ptr<WriterProperties>& properties() const override;
-
- int num_columns() const override;
- int num_row_groups() const override;
- int64_t num_rows() const override;
-
- virtual ~FileSerializer();
-
- private:
- explicit FileSerializer(
- const std::shared_ptr<OutputStream>& sink,
- const std::shared_ptr<schema::GroupNode>& schema,
- const std::shared_ptr<WriterProperties>& properties,
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata);
-
- std::shared_ptr<OutputStream> sink_;
- bool is_open_;
- const std::shared_ptr<WriterProperties> properties_;
- int num_row_groups_;
- int64_t num_rows_;
- std::unique_ptr<FileMetaDataBuilder> metadata_;
- std::unique_ptr<RowGroupWriter> row_group_writer_;
-
- void StartFile();
- void WriteMetaData();
-};
-
-} // namespace parquet
-
-#endif // PARQUET_FILE_WRITER_INTERNAL_H
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
deleted file mode 100644
index a91553b..0000000
--- a/src/parquet/file/writer.cc
+++ /dev/null
@@ -1,119 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/file/writer.h"
-
-#include "parquet/file/writer-internal.h"
-#include "parquet/util/memory.h"
-
-using parquet::schema::GroupNode;
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// RowGroupWriter public API
-
-RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents)
- : contents_(std::move(contents)) {}
-
-void RowGroupWriter::Close() {
- if (contents_) {
- contents_->Close();
- }
-}
-
-ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); }
-
-int RowGroupWriter::current_column() { return contents_->current_column(); }
-
-int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
-
-int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
-
-// ----------------------------------------------------------------------
-// ParquetFileWriter public API
-
-ParquetFileWriter::ParquetFileWriter() {}
-
-ParquetFileWriter::~ParquetFileWriter() {
- try {
- Close();
- } catch (...) {
- }
-}
-
-std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
- const std::shared_ptr<::arrow::io::OutputStream>& sink,
- const std::shared_ptr<GroupNode>& schema,
- const std::shared_ptr<WriterProperties>& properties,
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
- return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties,
- key_value_metadata);
-}
-
-std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
- const std::shared_ptr<OutputStream>& sink,
- const std::shared_ptr<schema::GroupNode>& schema,
- const std::shared_ptr<WriterProperties>& properties,
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
- auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata);
- std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
- result->Open(std::move(contents));
- return result;
-}
-
-const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); }
-
-const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
- return contents_->schema()->Column(i);
-}
-
-int ParquetFileWriter::num_columns() const { return contents_->num_columns(); }
-
-int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); }
-
-int ParquetFileWriter::num_row_groups() const { return contents_->num_row_groups(); }
-
-const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_metadata()
- const {
- return contents_->key_value_metadata();
-}
-
-void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) {
- contents_ = std::move(contents);
-}
-
-void ParquetFileWriter::Close() {
- if (contents_) {
- contents_->Close();
- contents_.reset();
- }
-}
-
-RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
- return contents_->AppendRowGroup();
-}
-
-RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
- return AppendRowGroup();
-}
-
-const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
- return contents_->properties();
-}
-
-} // namespace parquet
diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc
new file mode 100644
index 0000000..72c71c6
--- /dev/null
+++ b/src/parquet/file_reader.cc
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_reader.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/file.h"
+
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_scanner.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/parquet_types.h"
+#include "parquet/properties.h"
+#include "parquet/types.h"
+#include "parquet/util/logging.h"
+#include "parquet/util/memory.h"
+
+using std::string;
+
+namespace arrow {
+
+class Codec;
+
+} // namespace arrow
+
+namespace parquet {
+
+// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
+static constexpr int64_t DEFAULT_FOOTER_READ_SIZE = 64 * 1024;
+static constexpr uint32_t FOOTER_SIZE = 8;
+static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
+
+// For PARQUET-816
+static constexpr int64_t kMaxDictHeaderSize = 100;
+
+// ----------------------------------------------------------------------
+// RowGroupReader public API
+
+RowGroupReader::RowGroupReader(std::unique_ptr<Contents> contents)
+ : contents_(std::move(contents)) {}
+
+std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
+ DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+ << metadata()->num_columns()
+ << "columns, requested column: " << i;
+ const ColumnDescriptor* descr = metadata()->schema()->Column(i);
+
+ std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
+ return ColumnReader::Make(
+ descr, std::move(page_reader),
+ const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
+}
+
+std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
+ DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+ << metadata()->num_columns()
+ << "columns, requested column: " << i;
+ return contents_->GetColumnPageReader(i);
+}
+
+// Returns the rowgroup metadata
+const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); }
+
+// RowGroupReader::Contents implementation for the Parquet file specification
+class SerializedRowGroup : public RowGroupReader::Contents {
+ public:
+ SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata,
+ int row_group_number, const ReaderProperties& props)
+ : source_(source), file_metadata_(file_metadata), properties_(props) {
+ row_group_metadata_ = file_metadata->RowGroup(row_group_number);
+ }
+
+ const RowGroupMetaData* metadata() const override { return row_group_metadata_.get(); }
+
+ const ReaderProperties* properties() const override { return &properties_; }
+
+ std::unique_ptr<PageReader> GetColumnPageReader(int i) override {
+ // Read column chunk from the file
+ auto col = row_group_metadata_->ColumnChunk(i);
+
+ int64_t col_start = col->data_page_offset();
+ if (col->has_dictionary_page() && col_start > col->dictionary_page_offset()) {
+ col_start = col->dictionary_page_offset();
+ }
+
+ int64_t col_length = col->total_compressed_size();
+ std::unique_ptr<InputStream> stream;
+
+ // PARQUET-816 workaround for old files created by older parquet-mr
+ const ApplicationVersion& version = file_metadata_->writer_version();
+ if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION)) {
+ // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
+ // dictionary page header size in total_compressed_size and total_uncompressed_size
+ // (see IMPALA-694). We add padding to compensate.
+ int64_t bytes_remaining = source_->Size() - (col_start + col_length);
+ int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining);
+ col_length += padding;
+ }
+
+ stream = properties_.GetStream(source_, col_start, col_length);
+
+ return PageReader::Open(std::move(stream), col->num_values(), col->compression(),
+ properties_.memory_pool());
+ }
+
+ private:
+ RandomAccessSource* source_;
+ FileMetaData* file_metadata_;
+ std::unique_ptr<RowGroupMetaData> row_group_metadata_;
+ ReaderProperties properties_;
+};
+
+// ----------------------------------------------------------------------
+// SerializedFile: An implementation of ParquetFileReader::Contents that deals
+// with the Parquet file structure, Thrift deserialization, and other internal
+// matters
+
+// This class takes ownership of the provided data source
+class SerializedFile : public ParquetFileReader::Contents {
+ public:
+ SerializedFile(std::unique_ptr<RandomAccessSource> source,
+ const ReaderProperties& props = default_reader_properties())
+ : source_(std::move(source)), properties_(props) {}
+
+ ~SerializedFile() {
+ try {
+ Close();
+ } catch (...) {
+ }
+ }
+
+ void Close() override { source_->Close(); }
+
+ std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
+ std::unique_ptr<SerializedRowGroup> contents(
+ new SerializedRowGroup(source_.get(), file_metadata_.get(), i, properties_));
+ return std::make_shared<RowGroupReader>(std::move(contents));
+ }
+
+ std::shared_ptr<FileMetaData> metadata() const override { return file_metadata_; }
+
+ void set_metadata(const std::shared_ptr<FileMetaData>& metadata) {
+ file_metadata_ = metadata;
+ }
+
+ void ParseMetaData() {
+ int64_t file_size = source_->Size();
+
+ if (file_size < FOOTER_SIZE) {
+ throw ParquetException("Corrupted file, smaller than file footer");
+ }
+
+ uint8_t footer_buffer[DEFAULT_FOOTER_READ_SIZE];
+ int64_t footer_read_size = std::min(file_size, DEFAULT_FOOTER_READ_SIZE);
+ int64_t bytes_read =
+ source_->ReadAt(file_size - footer_read_size, footer_read_size, footer_buffer);
+
+ // Check if all bytes are read. Check if last 4 bytes read have the magic bits
+ if (bytes_read != footer_read_size ||
+ memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) != 0) {
+ throw ParquetException("Invalid parquet file. Corrupt footer.");
+ }
+
+ uint32_t metadata_len =
+ *reinterpret_cast<uint32_t*>(footer_buffer + footer_read_size - FOOTER_SIZE);
+ int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len;
+ if (FOOTER_SIZE + metadata_len > file_size) {
+ throw ParquetException(
+ "Invalid parquet file. File is less than "
+ "file metadata size.");
+ }
+
+ std::shared_ptr<PoolBuffer> metadata_buffer =
+ AllocateBuffer(properties_.memory_pool(), metadata_len);
+
+ // Check if the footer_buffer contains the entire metadata
+ if (footer_read_size >= (metadata_len + FOOTER_SIZE)) {
+ memcpy(metadata_buffer->mutable_data(),
+ footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE),
+ metadata_len);
+ } else {
+ bytes_read =
+ source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data());
+ if (bytes_read != metadata_len) {
+ throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
+ }
+ }
+
+ file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len);
+ }
+
+ private:
+ std::unique_ptr<RandomAccessSource> source_;
+ std::shared_ptr<FileMetaData> file_metadata_;
+ ReaderProperties properties_;
+};
+
+// ----------------------------------------------------------------------
+// ParquetFileReader public API
+
+ParquetFileReader::ParquetFileReader() {}
+
+ParquetFileReader::~ParquetFileReader() {
+ try {
+ Close();
+ } catch (...) {
+ }
+}
+
+// Open the file. If no metadata is passed, it is parsed from the footer of
+// the file
+std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open(
+ std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
+ const std::shared_ptr<FileMetaData>& metadata) {
+ std::unique_ptr<ParquetFileReader::Contents> result(
+ new SerializedFile(std::move(source), props));
+
+ // Access private methods here, but otherwise unavailable
+ SerializedFile* file = static_cast<SerializedFile*>(result.get());
+
+ if (metadata == nullptr) {
+ // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor
+ file->ParseMetaData();
+ } else {
+ file->set_metadata(metadata);
+ }
+
+ return result;
+}
+
+std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
+ const std::shared_ptr<::arrow::io::ReadableFileInterface>& source,
+ const ReaderProperties& props, const std::shared_ptr<FileMetaData>& metadata) {
+ std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(source));
+ return Open(std::move(io_wrapper), props, metadata);
+}
+
+std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
+ std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
+ const std::shared_ptr<FileMetaData>& metadata) {
+ auto contents = SerializedFile::Open(std::move(source), props, metadata);
+ std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
+ result->Open(std::move(contents));
+ return result;
+}
+
+std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
+ const std::string& path, bool memory_map, const ReaderProperties& props,
+ const std::shared_ptr<FileMetaData>& metadata) {
+ std::shared_ptr<::arrow::io::ReadableFileInterface> source;
+ if (memory_map) {
+ std::shared_ptr<::arrow::io::ReadableFile> handle;
+ PARQUET_THROW_NOT_OK(
+ ::arrow::io::ReadableFile::Open(path, props.memory_pool(), &handle));
+ source = handle;
+ } else {
+ std::shared_ptr<::arrow::io::MemoryMappedFile> handle;
+ PARQUET_THROW_NOT_OK(
+ ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ, &handle));
+ source = handle;
+ }
+
+ return Open(source, props, metadata);
+}
+
+void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) {
+ contents_ = std::move(contents);
+}
+
+void ParquetFileReader::Close() {
+ if (contents_) {
+ contents_->Close();
+ }
+}
+
+std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const {
+ return contents_->metadata();
+}
+
+std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
+ DCHECK(i < metadata()->num_row_groups()) << "The file only has "
+ << metadata()->num_row_groups()
+ << "row groups, requested reader for: " << i;
+ return contents_->GetRowGroup(i);
+}
+
+// ----------------------------------------------------------------------
+// File metadata helpers
+
+std::shared_ptr<FileMetaData> ReadMetaData(
+ const std::shared_ptr<::arrow::io::ReadableFileInterface>& source) {
+ return ParquetFileReader::Open(source)->metadata();
+}
+
+// ----------------------------------------------------------------------
+// File scanner for performance testing
+
+int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size,
+ ParquetFileReader* reader) {
+ std::vector<int16_t> rep_levels(column_batch_size);
+ std::vector<int16_t> def_levels(column_batch_size);
+
+ int num_columns = static_cast<int>(columns.size());
+
+ // columns are not specified explicitly. Add all columns
+ if (columns.size() == 0) {
+ num_columns = reader->metadata()->num_columns();
+ columns.resize(num_columns);
+ for (int i = 0; i < num_columns; i++) {
+ columns[i] = i;
+ }
+ }
+
+ std::vector<int64_t> total_rows(num_columns, 0);
+
+ for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) {
+ auto group_reader = reader->RowGroup(r);
+ int col = 0;
+ for (auto i : columns) {
+ std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
+ size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type());
+ std::vector<uint8_t> values(column_batch_size * value_byte_size);
+
+ int64_t values_read = 0;
+ while (col_reader->HasNext()) {
+ total_rows[col] +=
+ ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(),
+ values.data(), &values_read, col_reader.get());
+ }
+ col++;
+ }
+ }
+
+ for (int i = 1; i < num_columns; ++i) {
+ if (total_rows[0] != total_rows[i]) {
+ throw ParquetException("Parquet error: Total rows among columns do not match");
+ }
+ }
+
+ return total_rows[0];
+}
+
+} // namespace parquet
diff --git a/src/parquet/file/reader.h b/src/parquet/file_reader.h
similarity index 93%
rename from src/parquet/file/reader.h
rename to src/parquet/file_reader.h
index 7558394..f751e9b 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file_reader.h
@@ -25,8 +25,8 @@
#include <string>
#include <vector>
-#include "parquet/column_page.h"
-#include "parquet/file/metadata.h"
+#include "parquet/column_reader.h"
+#include "parquet/metadata.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/statistics.h"
@@ -71,6 +71,11 @@
// easily create test fixtures
// An implementation of the Contents class is defined in the .cc file
struct Contents {
+ static std::unique_ptr<Contents> Open(
+ std::unique_ptr<RandomAccessSource> source,
+ const ReaderProperties& props = default_reader_properties(),
+ const std::shared_ptr<FileMetaData>& metadata = nullptr);
+
virtual ~Contents() {}
// Perform any cleanup associated with the file contents
virtual void Close() = 0;
diff --git a/src/parquet/file_writer.cc b/src/parquet/file_writer.cc
new file mode 100644
index 0000000..87ee4f6
--- /dev/null
+++ b/src/parquet/file_writer.cc
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_writer.h"
+
+#include "parquet/column_writer.h"
+#include "parquet/schema-internal.h"
+#include "parquet/schema.h"
+#include "parquet/thrift.h"
+#include "parquet/util/memory.h"
+
+using arrow::MemoryPool;
+
+using parquet::schema::GroupNode;
+using parquet::schema::SchemaFlattener;
+
+namespace parquet {
+
+// FIXME: copied from reader-internal.cc
+static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
+
+// ----------------------------------------------------------------------
+// RowGroupWriter public API
+
+RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents)
+ : contents_(std::move(contents)) {}
+
+void RowGroupWriter::Close() {
+ if (contents_) {
+ contents_->Close();
+ }
+}
+
+ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); }
+
+int RowGroupWriter::current_column() { return contents_->current_column(); }
+
+int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
+
+int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
+
+// ----------------------------------------------------------------------
+// RowGroupSerializer
+
+// RowGroupWriter::Contents implementation for the Parquet file specification
+class RowGroupSerializer : public RowGroupWriter::Contents {
+ public:
+ RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
+ const WriterProperties* properties)
+ : sink_(sink),
+ metadata_(metadata),
+ properties_(properties),
+ total_bytes_written_(0),
+ closed_(false),
+ current_column_index_(0),
+ num_rows_(-1) {}
+
+ int num_columns() const override { return metadata_->num_columns(); }
+
+ int64_t num_rows() const override {
+ if (current_column_writer_) {
+ CheckRowsWritten();
+ }
+ return num_rows_ < 0 ? 0 : num_rows_;
+ }
+
+ ColumnWriter* NextColumn() override {
+ if (current_column_writer_) {
+ CheckRowsWritten();
+ }
+
+ // Throws an error if more columns are being written
+ auto col_meta = metadata_->NextColumnChunk();
+
+ if (current_column_writer_) {
+ total_bytes_written_ += current_column_writer_->Close();
+ }
+
+ ++current_column_index_;
+
+ const ColumnDescriptor* column_descr = col_meta->descr();
+ std::unique_ptr<PageWriter> pager =
+ PageWriter::Open(sink_, properties_->compression(column_descr->path()), col_meta,
+ properties_->memory_pool());
+ current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), properties_);
+ return current_column_writer_.get();
+ }
+
+ int current_column() const override { return metadata_->current_column(); }
+
+ void Close() override {
+ if (!closed_) {
+ closed_ = true;
+
+ if (current_column_writer_) {
+ CheckRowsWritten();
+ total_bytes_written_ += current_column_writer_->Close();
+ current_column_writer_.reset();
+ }
+
+ // Ensures all columns have been written
+ metadata_->Finish(total_bytes_written_);
+ }
+ }
+
+ private:
+ OutputStream* sink_;
+ mutable RowGroupMetaDataBuilder* metadata_;
+ const WriterProperties* properties_;
+ int64_t total_bytes_written_;
+ bool closed_;
+ int current_column_index_;
+ mutable int64_t num_rows_;
+
+ void CheckRowsWritten() const {
+ int64_t current_rows = current_column_writer_->rows_written();
+ if (num_rows_ < 0) {
+ num_rows_ = current_rows;
+ metadata_->set_num_rows(current_rows);
+ } else if (num_rows_ != current_rows) {
+ std::stringstream ss;
+ ss << "Column " << current_column_index_ << " had " << current_rows
+ << " while previous column had " << num_rows_;
+ throw ParquetException(ss.str());
+ }
+ }
+
+ std::shared_ptr<ColumnWriter> current_column_writer_;
+};
+
+// ----------------------------------------------------------------------
+// FileSerializer
+
+// An implementation of ParquetFileWriter::Contents that deals with the Parquet
+// file structure, Thrift serialization, and other internal matters
+
+class FileSerializer : public ParquetFileWriter::Contents {
+ public:
+ static std::unique_ptr<ParquetFileWriter::Contents> Open(
+ const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema,
+ const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+ std::unique_ptr<ParquetFileWriter::Contents> result(
+ new FileSerializer(sink, schema, properties, key_value_metadata));
+
+ return result;
+ }
+
+ void Close() override {
+ if (is_open_) {
+ if (row_group_writer_) {
+ num_rows_ += row_group_writer_->num_rows();
+ row_group_writer_->Close();
+ }
+ row_group_writer_.reset();
+
+ // Write magic bytes and metadata
+ WriteMetaData();
+
+ sink_->Close();
+ is_open_ = false;
+ }
+ }
+
+ int num_columns() const override { return schema_.num_columns(); }
+
+ int num_row_groups() const override { return num_row_groups_; }
+
+ int64_t num_rows() const override { return num_rows_; }
+
+ const std::shared_ptr<WriterProperties>& properties() const override {
+ return properties_;
+ }
+
+ RowGroupWriter* AppendRowGroup() override {
+ if (row_group_writer_) {
+ row_group_writer_->Close();
+ }
+ num_row_groups_++;
+ auto rg_metadata = metadata_->AppendRowGroup();
+ std::unique_ptr<RowGroupWriter::Contents> contents(
+ new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get()));
+ row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
+ return row_group_writer_.get();
+ }
+
+ ~FileSerializer() {
+ try {
+ Close();
+ } catch (...) {
+ }
+ }
+
+ private:
+ FileSerializer(const std::shared_ptr<OutputStream>& sink,
+ const std::shared_ptr<GroupNode>& schema,
+ const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata)
+ : ParquetFileWriter::Contents(schema, key_value_metadata),
+ sink_(sink),
+ is_open_(true),
+ properties_(properties),
+ num_row_groups_(0),
+ num_rows_(0),
+ metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) {
+ StartFile();
+ }
+
+ std::shared_ptr<OutputStream> sink_;
+ bool is_open_;
+ const std::shared_ptr<WriterProperties> properties_;
+ int num_row_groups_;
+ int64_t num_rows_;
+ std::unique_ptr<FileMetaDataBuilder> metadata_;
+ std::unique_ptr<RowGroupWriter> row_group_writer_;
+
+ void StartFile() {
+ // Parquet files always start with PAR1
+ sink_->Write(PARQUET_MAGIC, 4);
+ }
+
+ void WriteMetaData() {
+ // Write MetaData
+ uint32_t metadata_len = static_cast<uint32_t>(sink_->Tell());
+
+ // Get a FileMetaData
+ auto metadata = metadata_->Finish();
+ metadata->WriteTo(sink_.get());
+ metadata_len = static_cast<uint32_t>(sink_->Tell()) - metadata_len;
+
+ // Write Footer
+ sink_->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
+ sink_->Write(PARQUET_MAGIC, 4);
+ }
+};
+
+// ----------------------------------------------------------------------
+// ParquetFileWriter public API
+
+ParquetFileWriter::ParquetFileWriter() {}
+
+ParquetFileWriter::~ParquetFileWriter() {
+ try {
+ Close();
+ } catch (...) {
+ }
+}
+
+std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
+ const std::shared_ptr<::arrow::io::OutputStream>& sink,
+ const std::shared_ptr<GroupNode>& schema,
+ const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+ return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties,
+ key_value_metadata);
+}
+
+std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
+ const std::shared_ptr<OutputStream>& sink,
+ const std::shared_ptr<schema::GroupNode>& schema,
+ const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+ auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata);
+ std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
+ result->Open(std::move(contents));
+ return result;
+}
+
+const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); }
+
+const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
+ return contents_->schema()->Column(i);
+}
+
+int ParquetFileWriter::num_columns() const { return contents_->num_columns(); }
+
+int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); }
+
+int ParquetFileWriter::num_row_groups() const { return contents_->num_row_groups(); }
+
+const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_metadata()
+ const {
+ return contents_->key_value_metadata();
+}
+
+void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) {
+ contents_ = std::move(contents);
+}
+
+void ParquetFileWriter::Close() {
+ if (contents_) {
+ contents_->Close();
+ contents_.reset();
+ }
+}
+
+RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
+ return contents_->AppendRowGroup();
+}
+
+RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
+ return AppendRowGroup();
+}
+
+const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
+ return contents_->properties();
+}
+
+} // namespace parquet
diff --git a/src/parquet/file/writer.h b/src/parquet/file_writer.h
similarity index 98%
rename from src/parquet/file/writer.h
rename to src/parquet/file_writer.h
index 844aa16..f165261 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file_writer.h
@@ -21,7 +21,7 @@
#include <cstdint>
#include <memory>
-#include "parquet/file/metadata.h"
+#include "parquet/metadata.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/util/memory.h"
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/metadata-test.cc
similarity index 99%
rename from src/parquet/file/file-metadata-test.cc
rename to src/parquet/metadata-test.cc
index 49cba3a..b20293b 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/metadata-test.cc
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include "parquet/metadata.h"
#include <gtest/gtest.h>
-#include "parquet/file/metadata.h"
#include "parquet/schema.h"
#include "parquet/statistics.h"
#include "parquet/types.h"
diff --git a/src/parquet/file/metadata.cc b/src/parquet/metadata.cc
similarity index 99%
rename from src/parquet/file/metadata.cc
rename to src/parquet/metadata.cc
index 9f1cdd7..1c7db86 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/metadata.cc
@@ -20,7 +20,7 @@
#include <vector>
#include "parquet/exception.h"
-#include "parquet/file/metadata.h"
+#include "parquet/metadata.h"
#include "parquet/schema-internal.h"
#include "parquet/schema.h"
#include "parquet/thrift.h"
diff --git a/src/parquet/file/metadata.h b/src/parquet/metadata.h
similarity index 100%
rename from src/parquet/file/metadata.h
rename to src/parquet/metadata.h
diff --git a/src/parquet/file/printer.cc b/src/parquet/printer.cc
similarity index 99%
rename from src/parquet/file/printer.cc
rename to src/parquet/printer.cc
index 727552d..88b5528 100644
--- a/src/parquet/file/printer.cc
+++ b/src/parquet/printer.cc
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "parquet/file/printer.h"
+#include "parquet/printer.h"
#include <string>
#include <vector>
diff --git a/src/parquet/file/printer.h b/src/parquet/printer.h
similarity index 97%
rename from src/parquet/file/printer.h
rename to src/parquet/printer.h
index a18af4a..3b82882 100644
--- a/src/parquet/file/printer.h
+++ b/src/parquet/printer.h
@@ -25,7 +25,7 @@
#include <string>
#include <vector>
-#include "parquet/file/reader.h"
+#include "parquet/file_reader.h"
namespace parquet {
diff --git a/src/parquet/properties-test.cc b/src/parquet/properties-test.cc
index 4a063c1..c740b59 100644
--- a/src/parquet/properties-test.cc
+++ b/src/parquet/properties-test.cc
@@ -19,7 +19,7 @@
#include <string>
-#include "parquet/file/reader.h"
+#include "parquet/file_reader.h"
#include "parquet/properties.h"
namespace parquet {
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index cefa452..c536fdc 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -27,9 +27,8 @@
#include "parquet/column_reader.h"
#include "parquet/column_scanner.h"
-#include "parquet/file/printer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/printer.h"
#include "parquet/util/memory.h"
using std::string;
@@ -204,7 +203,7 @@
TEST_F(TestLocalFile, FileClosedOnDestruction) {
bool close_called = false;
{
- auto contents = SerializedFile::Open(
+ auto contents = ParquetFileReader::Contents::Open(
std::unique_ptr<RandomAccessSource>(new HelperFileClosed(handle, &close_called)));
std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
result->Open(std::move(contents));
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
index fc905b5..e5992c6 100644
--- a/src/parquet/statistics-test.cc
+++ b/src/parquet/statistics-test.cc
@@ -26,8 +26,8 @@
#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
-#include "parquet/file/reader.h"
-#include "parquet/file/writer.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
#include "parquet/schema.h"
#include "parquet/statistics.h"
#include "parquet/test-specialization.h"
@@ -194,8 +194,9 @@
}
template <typename TestType>
-typename std::vector<typename TestType::c_type> TestRowGroupStatistics<
- TestType>::GetDeepCopy(const std::vector<typename TestType::c_type>& values) {
+typename std::vector<typename TestType::c_type>
+TestRowGroupStatistics<TestType>::GetDeepCopy(
+ const std::vector<typename TestType::c_type>& values) {
return values;
}
diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h
index 0698652..ac6d0a1 100644
--- a/src/parquet/test-util.h
+++ b/src/parquet/test-util.h
@@ -79,8 +79,7 @@
explicit MockPageReader(const vector<shared_ptr<Page>>& pages)
: pages_(pages), page_index_(0) {}
- // Implement the PageReader interface
- virtual shared_ptr<Page> NextPage() {
+ shared_ptr<Page> NextPage() override {
if (page_index_ == static_cast<int>(pages_.size())) {
// EOS to consumer
return shared_ptr<Page>(nullptr);
@@ -88,6 +87,9 @@
return pages_[page_index_++];
}
+ // No-op
+ void set_max_page_header_size(uint32_t size) override {}
+
private:
vector<shared_ptr<Page>> pages_;
int page_index_;