blob: dded5fa166b7715b50799d6f2e37ba6cd93efb25 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cassert>
#include <fstream>
#include <iostream>
#include <memory>
#include <reader_writer.h>
/*
* This example describes writing and reading Parquet Files in C++ and serves as a
* reference to the API.
* The file contains all the physical data types supported by Parquet.
* This example uses the RowGroupWriter API that supports writing RowGroups based on a
*certain size
**/
/* Parquet is a structured columnar file format
* Parquet File = "Parquet data" + "Parquet Metadata"
* "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a
* columnar layout
* "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their
* Columns
* "file schema" is a tree where each node is either a primitive type (leaf nodes) or a
* complex (nested) type (internal nodes)
* For specific details, please refer the format here:
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
**/
constexpr int NUM_ROWS = 2500000;
constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024; // 16 MB
const char PARQUET_FILENAME[] = "parquet_cpp_example2.parquet";
int main(int argc, char** argv) {
/**********************************************************************************
PARQUET WRITER EXAMPLE
**********************************************************************************/
// parquet::REQUIRED fields do not need definition and repetition level values
// parquet::OPTIONAL fields require only definition level values
// parquet::REPEATED fields require both definition and repetition level values
try {
// Create a local file output stream instance.
using FileClass = ::arrow::io::FileOutputStream;
std::shared_ptr<FileClass> out_file;
PARQUET_THROW_NOT_OK(FileClass::Open(PARQUET_FILENAME, &out_file));
// Setup the parquet schema
std::shared_ptr<GroupNode> schema = SetupSchema();
// Add writer properties
parquet::WriterProperties::Builder builder;
builder.compression(parquet::Compression::SNAPPY);
std::shared_ptr<parquet::WriterProperties> props = builder.build();
// Create a ParquetFileWriter instance
std::shared_ptr<parquet::ParquetFileWriter> file_writer =
parquet::ParquetFileWriter::Open(out_file, schema, props);
// Append a BufferedRowGroup to keep the RowGroup open until a certain size
parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup();
int num_columns = file_writer->num_columns();
std::vector<int64_t> buffered_values_estimate(num_columns, 0);
for (int i = 0; i < NUM_ROWS; i++) {
int64_t estimated_bytes = 0;
// Get the estimated size of the values that are not written to a page yet
for (int n = 0; n < num_columns; n++) {
estimated_bytes += buffered_values_estimate[n];
}
// We need to consider the compressed pages
// as well as the values that are not compressed yet
if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() +
estimated_bytes) > ROW_GROUP_SIZE) {
rg_writer->Close();
std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0);
rg_writer = file_writer->AppendBufferedRowGroup();
}
int col_id = 0;
// Write the Bool column
parquet::BoolWriter* bool_writer =
static_cast<parquet::BoolWriter*>(rg_writer->column(col_id));
bool bool_value = ((i % 2) == 0) ? true : false;
bool_writer->WriteBatch(1, nullptr, nullptr, &bool_value);
buffered_values_estimate[col_id] = bool_writer->EstimatedBufferedValueBytes();
// Write the Int32 column
col_id++;
parquet::Int32Writer* int32_writer =
static_cast<parquet::Int32Writer*>(rg_writer->column(col_id));
int32_t int32_value = i;
int32_writer->WriteBatch(1, nullptr, nullptr, &int32_value);
buffered_values_estimate[col_id] = int32_writer->EstimatedBufferedValueBytes();
// Write the Int64 column. Each row has repeats twice.
col_id++;
parquet::Int64Writer* int64_writer =
static_cast<parquet::Int64Writer*>(rg_writer->column(col_id));
int64_t int64_value1 = 2 * i;
int16_t definition_level = 1;
int16_t repetition_level = 0;
int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value1);
int64_t int64_value2 = (2 * i + 1);
repetition_level = 1; // start of a new record
int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value2);
buffered_values_estimate[col_id] = int64_writer->EstimatedBufferedValueBytes();
// Write the INT96 column.
col_id++;
parquet::Int96Writer* int96_writer =
static_cast<parquet::Int96Writer*>(rg_writer->column(col_id));
parquet::Int96 int96_value;
int96_value.value[0] = i;
int96_value.value[1] = i + 1;
int96_value.value[2] = i + 2;
int96_writer->WriteBatch(1, nullptr, nullptr, &int96_value);
buffered_values_estimate[col_id] = int96_writer->EstimatedBufferedValueBytes();
// Write the Float column
col_id++;
parquet::FloatWriter* float_writer =
static_cast<parquet::FloatWriter*>(rg_writer->column(col_id));
float float_value = static_cast<float>(i) * 1.1f;
float_writer->WriteBatch(1, nullptr, nullptr, &float_value);
buffered_values_estimate[col_id] = float_writer->EstimatedBufferedValueBytes();
// Write the Double column
col_id++;
parquet::DoubleWriter* double_writer =
static_cast<parquet::DoubleWriter*>(rg_writer->column(col_id));
double double_value = i * 1.1111111;
double_writer->WriteBatch(1, nullptr, nullptr, &double_value);
buffered_values_estimate[col_id] = double_writer->EstimatedBufferedValueBytes();
// Write the ByteArray column. Make every alternate values NULL
col_id++;
parquet::ByteArrayWriter* ba_writer =
static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id));
parquet::ByteArray ba_value;
char hello[FIXED_LENGTH] = "parquet";
hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
if (i % 2 == 0) {
int16_t definition_level = 1;
ba_value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
ba_value.len = FIXED_LENGTH;
ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value);
} else {
int16_t definition_level = 0;
ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
}
buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes();
// Write the FixedLengthByteArray column
col_id++;
parquet::FixedLenByteArrayWriter* flba_writer =
static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->column(col_id));
parquet::FixedLenByteArray flba_value;
char v = static_cast<char>(i);
char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
flba_value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
flba_writer->WriteBatch(1, nullptr, nullptr, &flba_value);
buffered_values_estimate[col_id] = flba_writer->EstimatedBufferedValueBytes();
}
// Close the RowGroupWriter
rg_writer->Close();
// Close the ParquetFileWriter
file_writer->Close();
// Write the bytes to file
DCHECK(out_file->Close().ok());
} catch (const std::exception& e) {
std::cerr << "Parquet write error: " << e.what() << std::endl;
return -1;
}
/**********************************************************************************
PARQUET READER EXAMPLE
**********************************************************************************/
try {
// Create a ParquetReader instance
std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false);
// Get the File MetaData
std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
int num_row_groups = file_metadata->num_row_groups();
// Get the number of Columns
int num_columns = file_metadata->num_columns();
assert(num_columns == 8);
std::vector<int> col_row_counts(num_columns, 0);
// Iterate over all the RowGroups in the file
for (int r = 0; r < num_row_groups; ++r) {
// Get the RowGroup Reader
std::shared_ptr<parquet::RowGroupReader> row_group_reader =
parquet_reader->RowGroup(r);
assert(row_group_reader->metadata()->total_byte_size() < ROW_GROUP_SIZE);
int64_t values_read = 0;
int64_t rows_read = 0;
int16_t definition_level;
int16_t repetition_level;
std::shared_ptr<parquet::ColumnReader> column_reader;
int col_id = 0;
// Get the Column Reader for the boolean column
column_reader = row_group_reader->Column(col_id);
parquet::BoolReader* bool_reader =
static_cast<parquet::BoolReader*>(column_reader.get());
// Read all the rows in the column
while (bool_reader->HasNext()) {
bool value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
bool expected_value = ((col_row_counts[col_id] % 2) == 0) ? true : false;
assert(value == expected_value);
col_row_counts[col_id]++;
}
// Get the Column Reader for the Int32 column
col_id++;
column_reader = row_group_reader->Column(col_id);
parquet::Int32Reader* int32_reader =
static_cast<parquet::Int32Reader*>(column_reader.get());
// Read all the rows in the column
while (int32_reader->HasNext()) {
int32_t value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
assert(value == col_row_counts[col_id]);
col_row_counts[col_id]++;
}
// Get the Column Reader for the Int64 column
col_id++;
column_reader = row_group_reader->Column(col_id);
parquet::Int64Reader* int64_reader =
static_cast<parquet::Int64Reader*>(column_reader.get());
// Read all the rows in the column
while (int64_reader->HasNext()) {
int64_t value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level,
&value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
int64_t expected_value = col_row_counts[col_id];
assert(value == expected_value);
if ((col_row_counts[col_id] % 2) == 0) {
assert(repetition_level == 0);
} else {
assert(repetition_level == 1);
}
col_row_counts[col_id]++;
}
// Get the Column Reader for the Int96 column
col_id++;
column_reader = row_group_reader->Column(col_id);
parquet::Int96Reader* int96_reader =
static_cast<parquet::Int96Reader*>(column_reader.get());
// Read all the rows in the column
while (int96_reader->HasNext()) {
parquet::Int96 value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
parquet::Int96 expected_value;
expected_value.value[0] = col_row_counts[col_id];
expected_value.value[1] = col_row_counts[col_id] + 1;
expected_value.value[2] = col_row_counts[col_id] + 2;
for (int j = 0; j < 3; j++) {
assert(value.value[j] == expected_value.value[j]);
}
col_row_counts[col_id]++;
}
// Get the Column Reader for the Float column
col_id++;
column_reader = row_group_reader->Column(col_id);
parquet::FloatReader* float_reader =
static_cast<parquet::FloatReader*>(column_reader.get());
// Read all the rows in the column
while (float_reader->HasNext()) {
float value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
float expected_value = static_cast<float>(col_row_counts[col_id]) * 1.1f;
assert(value == expected_value);
col_row_counts[col_id]++;
}
// Get the Column Reader for the Double column
col_id++;
column_reader = row_group_reader->Column(col_id);
parquet::DoubleReader* double_reader =
static_cast<parquet::DoubleReader*>(column_reader.get());
// Read all the rows in the column
while (double_reader->HasNext()) {
double value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
double expected_value = col_row_counts[col_id] * 1.1111111;
assert(value == expected_value);
col_row_counts[col_id]++;
}
// Get the Column Reader for the ByteArray column
col_id++;
column_reader = row_group_reader->Column(col_id);
parquet::ByteArrayReader* ba_reader =
static_cast<parquet::ByteArrayReader*>(column_reader.get());
// Read all the rows in the column
while (ba_reader->HasNext()) {
parquet::ByteArray value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read =
ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// Verify the value written
char expected_value[FIXED_LENGTH] = "parquet";
expected_value[7] = static_cast<char>('0' + col_row_counts[col_id] / 100);
expected_value[8] = static_cast<char>('0' + (col_row_counts[col_id] / 10) % 10);
expected_value[9] = static_cast<char>('0' + col_row_counts[col_id] % 10);
if (col_row_counts[col_id] % 2 == 0) { // only alternate values exist
// There are no NULL values in the rows written
assert(values_read == 1);
assert(value.len == FIXED_LENGTH);
assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
assert(definition_level == 1);
} else {
// There are NULL values in the rows written
assert(values_read == 0);
assert(definition_level == 0);
}
col_row_counts[col_id]++;
}
// Get the Column Reader for the FixedLengthByteArray column
col_id++;
column_reader = row_group_reader->Column(col_id);
parquet::FixedLenByteArrayReader* flba_reader =
static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get());
// Read all the rows in the column
while (flba_reader->HasNext()) {
parquet::FixedLenByteArray value;
// Read one value at a time. The number of rows read is returned. values_read
// contains the number of non-null rows
rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
// Ensure only one value is read
assert(rows_read == 1);
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
char v = static_cast<char>(col_row_counts[col_id]);
char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
col_row_counts[col_id]++;
}
}
} catch (const std::exception& e) {
std::cerr << "Parquet read error: " << e.what() << std::endl;
return -1;
}
std::cout << "Parquet Writing and Reading Complete" << std::endl;
return 0;
}