blob: f9514aa2ad2ff9a96b5c48a56dc7c579e3f92354 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/parquet/writer.h"
#include <algorithm>
#include <vector>
#include "arrow/array.h"
#include "arrow/column.h"
#include "arrow/table.h"
#include "arrow/types/construct.h"
#include "arrow/types/primitive.h"
#include "arrow/types/string.h"
#include "arrow/parquet/schema.h"
#include "arrow/parquet/utils.h"
#include "arrow/util/status.h"
using parquet::ParquetFileWriter;
using parquet::ParquetVersion;
using parquet::schema::GroupNode;
namespace arrow {
namespace parquet {
class FileWriter::Impl {
public:
Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer);
Status NewRowGroup(int64_t chunk_size);
template <typename ParquetType, typename ArrowType>
Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data,
int64_t offset, int64_t length);
// TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary
// buffer
template <typename InType, typename OutType>
struct can_copy_ptr {
static constexpr bool value =
std::is_same<InType, OutType>::value ||
(std::is_integral<InType>{} && std::is_integral<OutType>{} &&
(sizeof(InType) == sizeof(OutType)));
};
template <typename InType, typename OutType,
typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) {
*out_ptr = reinterpret_cast<const OutType*>(in_ptr);
return Status::OK();
}
template <typename InType, typename OutType,
typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
Status ConvertPhysicalType(
const InType* in_ptr, int64_t length, const OutType** out_ptr) {
RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType)));
OutType* mutable_out_ptr = reinterpret_cast<OutType*>(data_buffer_.mutable_data());
std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
*out_ptr = mutable_out_ptr;
return Status::OK();
}
Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length);
Status WriteFlatColumnChunk(const StringArray* data, int64_t offset, int64_t length);
Status Close();
virtual ~Impl() {}
private:
friend class FileWriter;
MemoryPool* pool_;
// Buffer used for storing the data of an array converted to the physical type
// as expected by parquet-cpp.
PoolBuffer data_buffer_;
PoolBuffer def_levels_buffer_;
std::unique_ptr<::parquet::ParquetFileWriter> writer_;
::parquet::RowGroupWriter* row_group_writer_;
};
FileWriter::Impl::Impl(
MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer)
: pool_(pool),
data_buffer_(pool),
writer_(std::move(writer)),
row_group_writer_(nullptr) {}
Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup(chunk_size));
return Status::OK();
}
template <typename ParquetType, typename ArrowType>
Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer,
const PrimitiveArray* data, int64_t offset, int64_t length) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
DCHECK((offset + length) <= data->length());
auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data()) + offset;
auto writer =
reinterpret_cast<::parquet::TypedColumnWriter<ParquetType>*>(column_writer);
if (writer->descr()->max_definition_level() == 0) {
// no nulls, just dump the data
const ParquetCType* data_writer_ptr = nullptr;
RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
data_ptr, length, &data_writer_ptr)));
PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_writer_ptr));
} else if (writer->descr()->max_definition_level() == 1) {
RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
int16_t* def_levels_ptr =
reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
if (data->null_count() == 0) {
std::fill(def_levels_ptr, def_levels_ptr + length, 1);
const ParquetCType* data_writer_ptr = nullptr;
RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
data_ptr, length, &data_writer_ptr)));
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr));
} else {
RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType)));
auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
int buffer_idx = 0;
for (int i = 0; i < length; i++) {
if (data->IsNull(offset + i)) {
def_levels_ptr[i] = 0;
} else {
def_levels_ptr[i] = 1;
buffer_ptr[buffer_idx++] = static_cast<ParquetCType>(data_ptr[i]);
}
}
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
}
} else {
return Status::NotImplemented("no support for max definition level > 1 yet");
}
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
// This specialization seems quite similar but it significantly differs in two points:
// * offset is added at the most latest time to the pointer as we have sub-byte access
// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
// ArrowType::c_type to ParquetType::c_type
template <>
Status FileWriter::Impl::TypedWriteBatch<::parquet::BooleanType, BooleanType>(
::parquet::ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset,
int64_t length) {
DCHECK((offset + length) <= data->length());
RETURN_NOT_OK(data_buffer_.Resize(length));
auto data_ptr = reinterpret_cast<const uint8_t*>(data->data()->data());
auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data());
auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::BooleanType>*>(
column_writer);
if (writer->descr()->max_definition_level() == 0) {
// no nulls, just dump the data
for (int64_t i = 0; i < length; i++) {
buffer_ptr[i] = util::get_bit(data_ptr, offset + i);
}
PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, buffer_ptr));
} else if (writer->descr()->max_definition_level() == 1) {
RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
int16_t* def_levels_ptr =
reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
if (data->null_count() == 0) {
std::fill(def_levels_ptr, def_levels_ptr + length, 1);
for (int64_t i = 0; i < length; i++) {
buffer_ptr[i] = util::get_bit(data_ptr, offset + i);
}
// TODO(PARQUET-644): write boolean values as a packed bitmap
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
} else {
int buffer_idx = 0;
for (int i = 0; i < length; i++) {
if (data->IsNull(offset + i)) {
def_levels_ptr[i] = 0;
} else {
def_levels_ptr[i] = 1;
buffer_ptr[buffer_idx++] = util::get_bit(data_ptr, offset + i);
}
}
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
}
} else {
return Status::NotImplemented("no support for max definition level > 1 yet");
}
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
Status FileWriter::Impl::Close() {
if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
PARQUET_CATCH_NOT_OK(writer_->Close());
return Status::OK();
}
#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
case Type::ENUM: \
return TypedWriteBatch<ParquetType, ArrowType>(writer, data, offset, length); \
break;
Status FileWriter::Impl::WriteFlatColumnChunk(
const PrimitiveArray* data, int64_t offset, int64_t length) {
::parquet::ColumnWriter* writer;
PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn());
switch (data->type_enum()) {
TYPED_BATCH_CASE(BOOL, BooleanType, ::parquet::BooleanType)
TYPED_BATCH_CASE(UINT8, UInt8Type, ::parquet::Int32Type)
TYPED_BATCH_CASE(INT8, Int8Type, ::parquet::Int32Type)
TYPED_BATCH_CASE(UINT16, UInt16Type, ::parquet::Int32Type)
TYPED_BATCH_CASE(INT16, Int16Type, ::parquet::Int32Type)
case Type::UINT32:
if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
// Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
// to use the larger Int64Type to store them lossless.
return TypedWriteBatch<::parquet::Int64Type, UInt32Type>(
writer, data, offset, length);
} else {
return TypedWriteBatch<::parquet::Int32Type, UInt32Type>(
writer, data, offset, length);
}
TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type)
TYPED_BATCH_CASE(UINT64, UInt64Type, ::parquet::Int64Type)
TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)
TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType)
TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType)
default:
return Status::NotImplemented(data->type()->ToString());
}
}
Status FileWriter::Impl::WriteFlatColumnChunk(
const StringArray* data, int64_t offset, int64_t length) {
::parquet::ColumnWriter* column_writer;
PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
DCHECK((offset + length) <= data->length());
RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(::parquet::ByteArray)));
auto buffer_ptr = reinterpret_cast<::parquet::ByteArray*>(data_buffer_.mutable_data());
auto values = std::dynamic_pointer_cast<PrimitiveArray>(data->values());
auto data_ptr = reinterpret_cast<const uint8_t*>(values->data()->data());
DCHECK(values != nullptr);
auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::ByteArrayType>*>(
column_writer);
if (writer->descr()->max_definition_level() > 0) {
RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
}
int16_t* def_levels_ptr = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
if (writer->descr()->max_definition_level() == 0 || data->null_count() == 0) {
// no nulls, just dump the data
for (int64_t i = 0; i < length; i++) {
buffer_ptr[i] = ::parquet::ByteArray(
data->value_length(i + offset), data_ptr + data->value_offset(i));
}
if (writer->descr()->max_definition_level() > 0) {
std::fill(def_levels_ptr, def_levels_ptr + length, 1);
}
PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
} else if (writer->descr()->max_definition_level() == 1) {
int buffer_idx = 0;
for (int64_t i = 0; i < length; i++) {
if (data->IsNull(offset + i)) {
def_levels_ptr[i] = 0;
} else {
def_levels_ptr[i] = 1;
buffer_ptr[buffer_idx++] = ::parquet::ByteArray(
data->value_length(i + offset), data_ptr + data->value_offset(i + offset));
}
}
PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
} else {
return Status::NotImplemented("no support for max definition level > 1 yet");
}
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
FileWriter::FileWriter(
MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer)
: impl_(new FileWriter::Impl(pool, std::move(writer))) {}
Status FileWriter::NewRowGroup(int64_t chunk_size) {
return impl_->NewRowGroup(chunk_size);
}
Status FileWriter::WriteFlatColumnChunk(
const Array* array, int64_t offset, int64_t length) {
int64_t real_length = length;
if (length == -1) { real_length = array->length(); }
if (array->type_enum() == Type::STRING) {
auto string_array = dynamic_cast<const StringArray*>(array);
DCHECK(string_array);
return impl_->WriteFlatColumnChunk(string_array, offset, real_length);
} else {
auto primitive_array = dynamic_cast<const PrimitiveArray*>(array);
if (!primitive_array) {
return Status::NotImplemented("Table must consist of PrimitiveArray instances");
}
return impl_->WriteFlatColumnChunk(primitive_array, offset, real_length);
}
}
Status FileWriter::Close() {
return impl_->Close();
}
MemoryPool* FileWriter::memory_pool() const {
return impl_->pool_;
}
FileWriter::~FileWriter() {}
Status WriteFlatTable(const Table* table, MemoryPool* pool,
const std::shared_ptr<::parquet::OutputStream>& sink, int64_t chunk_size,
const std::shared_ptr<::parquet::WriterProperties>& properties) {
std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema;
RETURN_NOT_OK(
ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema));
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema());
std::unique_ptr<ParquetFileWriter> parquet_writer =
ParquetFileWriter::Open(sink, schema_node, properties);
FileWriter writer(pool, std::move(parquet_writer));
// TODO(ARROW-232) Support writing chunked arrays.
for (int i = 0; i < table->num_columns(); i++) {
if (table->column(i)->data()->num_chunks() != 1) {
return Status::NotImplemented("No support for writing chunked arrays yet.");
}
}
for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
int64_t size = std::min(chunk_size, table->num_rows() - offset);
RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close()));
for (int i = 0; i < table->num_columns(); i++) {
std::shared_ptr<Array> array = table->column(i)->data()->chunk(0);
RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(array.get(), offset, size),
PARQUET_IGNORE_NOT_OK(writer.Close()));
}
}
return writer.Close();
}
} // namespace parquet
} // namespace arrow