blob: d7001aaba6518b1b15c892ef4e80caa183107a2c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/arrow/writer.h"
#include <algorithm>
#include <string>
#include <vector>
#include "arrow/api.h"
#include "arrow/compute/api.h"
#include "arrow/util/bit-util.h"
#include "arrow/visitor_inline.h"
#include "parquet/arrow/schema.h"
#include "parquet/util/logging.h"
using arrow::Array;
using arrow::BinaryArray;
using arrow::FixedSizeBinaryArray;
using arrow::Decimal128Array;
using arrow::BooleanArray;
using arrow::Int16Array;
using arrow::Int16Builder;
using arrow::Field;
using arrow::MemoryPool;
using arrow::NumericArray;
using arrow::PoolBuffer;
using arrow::PrimitiveArray;
using arrow::ListArray;
using arrow::Status;
using arrow::Table;
using arrow::TimeUnit;
using arrow::compute::Cast;
using arrow::compute::CastOptions;
using arrow::compute::FunctionContext;
using parquet::ParquetFileWriter;
using parquet::ParquetVersion;
using parquet::schema::GroupNode;
namespace parquet {
namespace arrow {
namespace BitUtil = ::arrow::BitUtil;
std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties() {
static std::shared_ptr<ArrowWriterProperties> default_writer_properties =
ArrowWriterProperties::Builder().build();
return default_writer_properties;
}
class LevelBuilder {
public:
explicit LevelBuilder(MemoryPool* pool)
: def_levels_(::arrow::int16(), pool), rep_levels_(::arrow::int16(), pool) {
def_levels_buffer_ = std::make_shared<PoolBuffer>(pool);
}
Status VisitInline(const Array& array);
template <typename T>
typename std::enable_if<std::is_base_of<::arrow::FlatArray, T>::value, Status>::type
Visit(const T& array) {
array_offsets_.push_back(static_cast<int32_t>(array.offset()));
valid_bitmaps_.push_back(array.null_bitmap_data());
null_counts_.push_back(array.null_count());
values_type_ = array.type_id();
values_array_ = std::make_shared<T>(array.data());
return Status::OK();
}
Status Visit(const ListArray& array) {
array_offsets_.push_back(static_cast<int32_t>(array.offset()));
valid_bitmaps_.push_back(array.null_bitmap_data());
null_counts_.push_back(array.null_count());
offsets_.push_back(array.raw_value_offsets());
min_offset_idx_ = array.value_offset(min_offset_idx_);
max_offset_idx_ = array.value_offset(max_offset_idx_);
return VisitInline(*array.values());
}
#define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix) \
Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \
return Status::NotImplemented("Level generation for " #ArrowTypePrefix \
" not supported yet"); \
}
NOT_IMPLEMENTED_VISIT(Struct)
NOT_IMPLEMENTED_VISIT(Union)
NOT_IMPLEMENTED_VISIT(Dictionary)
NOT_IMPLEMENTED_VISIT(Interval)
Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
int64_t* values_offset, ::arrow::Type::type* values_type,
int64_t* num_values, int64_t* num_levels,
std::shared_ptr<Buffer>* def_levels,
std::shared_ptr<Buffer>* rep_levels,
std::shared_ptr<Array>* values_array) {
// Work downwards to extract bitmaps and offsets
min_offset_idx_ = 0;
max_offset_idx_ = array.length();
RETURN_NOT_OK(VisitInline(array));
*num_values = max_offset_idx_ - min_offset_idx_;
*values_offset = min_offset_idx_;
*values_type = values_type_;
*values_array = values_array_;
// Walk downwards to extract nullability
std::shared_ptr<Field> current_field = field;
nullable_.push_back(current_field->nullable());
while (current_field->type()->num_children() > 0) {
if (current_field->type()->num_children() > 1) {
return Status::NotImplemented(
"Fields with more than one child are not supported.");
} else {
current_field = current_field->type()->child(0);
}
nullable_.push_back(current_field->nullable());
}
// Generate the levels.
if (nullable_.size() == 1) {
// We have a PrimitiveArray
*rep_levels = nullptr;
if (nullable_[0]) {
RETURN_NOT_OK(def_levels_buffer_->Resize(array.length() * sizeof(int16_t)));
auto def_levels_ptr =
reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
if (array.null_count() == 0) {
std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1);
} else if (array.null_count() == array.length()) {
std::fill(def_levels_ptr, def_levels_ptr + array.length(), 0);
} else {
::arrow::internal::BitmapReader valid_bits_reader(
array.null_bitmap_data(), array.offset(), array.length());
for (int i = 0; i < array.length(); i++) {
if (valid_bits_reader.IsSet()) {
def_levels_ptr[i] = 1;
} else {
def_levels_ptr[i] = 0;
}
valid_bits_reader.Next();
}
}
*def_levels = def_levels_buffer_;
} else {
*def_levels = nullptr;
}
*num_levels = array.length();
} else {
RETURN_NOT_OK(rep_levels_.Append(0));
RETURN_NOT_OK(HandleListEntries(0, 0, 0, array.length()));
std::shared_ptr<Array> def_levels_array;
RETURN_NOT_OK(def_levels_.Finish(&def_levels_array));
*def_levels = static_cast<PrimitiveArray*>(def_levels_array.get())->values();
std::shared_ptr<Array> rep_levels_array;
RETURN_NOT_OK(rep_levels_.Finish(&rep_levels_array));
*rep_levels = static_cast<PrimitiveArray*>(rep_levels_array.get())->values();
*num_levels = rep_levels_array->length();
}
return Status::OK();
}
Status HandleList(int16_t def_level, int16_t rep_level, int64_t index) {
if (nullable_[rep_level]) {
if (null_counts_[rep_level] == 0 ||
BitUtil::GetBit(valid_bitmaps_[rep_level], index + array_offsets_[rep_level])) {
return HandleNonNullList(static_cast<int16_t>(def_level + 1), rep_level, index);
} else {
return def_levels_.Append(def_level);
}
} else {
return HandleNonNullList(def_level, rep_level, index);
}
}
Status HandleNonNullList(int16_t def_level, int16_t rep_level, int64_t index) {
int32_t inner_offset = offsets_[rep_level][index];
int32_t inner_length = offsets_[rep_level][index + 1] - inner_offset;
int64_t recursion_level = rep_level + 1;
if (inner_length == 0) {
return def_levels_.Append(def_level);
}
if (recursion_level < static_cast<int64_t>(offsets_.size())) {
return HandleListEntries(static_cast<int16_t>(def_level + 1),
static_cast<int16_t>(rep_level + 1), inner_offset,
inner_length);
} else {
// We have reached the leaf: primitive list, handle remaining nullables
for (int64_t i = 0; i < inner_length; i++) {
if (i > 0) {
RETURN_NOT_OK(rep_levels_.Append(static_cast<int16_t>(rep_level + 1)));
}
if (nullable_[recursion_level] &&
((null_counts_[recursion_level] == 0) ||
BitUtil::GetBit(valid_bitmaps_[recursion_level],
inner_offset + i + array_offsets_[recursion_level]))) {
RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 2)));
} else {
// This can be produced in two case:
// * elements are nullable and this one is null (i.e. max_def_level = def_level
// + 2)
// * elements are non-nullable (i.e. max_def_level = def_level + 1)
RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 1)));
}
}
return Status::OK();
}
}
Status HandleListEntries(int16_t def_level, int16_t rep_level, int64_t offset,
int64_t length) {
for (int64_t i = 0; i < length; i++) {
if (i > 0) {
RETURN_NOT_OK(rep_levels_.Append(rep_level));
}
RETURN_NOT_OK(HandleList(def_level, rep_level, offset + i));
}
return Status::OK();
}
private:
Int16Builder def_levels_;
std::shared_ptr<PoolBuffer> def_levels_buffer_;
Int16Builder rep_levels_;
std::vector<int64_t> null_counts_;
std::vector<const uint8_t*> valid_bitmaps_;
std::vector<const int32_t*> offsets_;
std::vector<int32_t> array_offsets_;
std::vector<bool> nullable_;
int64_t min_offset_idx_;
int64_t max_offset_idx_;
::arrow::Type::type values_type_;
std::shared_ptr<Array> values_array_;
};
Status LevelBuilder::VisitInline(const Array& array) {
return VisitArrayInline(array, this);
}
class FileWriter::Impl {
public:
Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
Status NewRowGroup(int64_t chunk_size);
template <typename ParquetType, typename ArrowType>
Status TypedWriteBatch(ColumnWriter* column_writer, const std::shared_ptr<Array>& data,
int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels);
Status WriteTimestamps(ColumnWriter* column_writer, const std::shared_ptr<Array>& data,
int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels);
Status WriteTimestampsCoerce(ColumnWriter* column_writer,
const std::shared_ptr<Array>& data, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels);
template <typename ParquetType, typename ArrowType>
Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* column_writer,
const ArrowType& type, int64_t num_values,
int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels,
const typename ArrowType::c_type* data_ptr);
template <typename ParquetType, typename ArrowType>
Status WriteNullableBatch(TypedColumnWriter<ParquetType>* column_writer,
const ArrowType& type, int64_t num_values, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels,
const uint8_t* valid_bits, int64_t valid_bits_offset,
const typename ArrowType::c_type* data_ptr);
Status WriteColumnChunk(const Array& data);
Status Close();
const WriterProperties& properties() const { return *writer_->properties(); }
virtual ~Impl() {}
private:
friend class FileWriter;
MemoryPool* pool_;
// Buffer used for storing the data of an array converted to the physical type
// as expected by parquet-cpp.
PoolBuffer data_buffer_;
std::unique_ptr<ParquetFileWriter> writer_;
RowGroupWriter* row_group_writer_;
std::shared_ptr<ArrowWriterProperties> arrow_properties_;
bool closed_;
};
FileWriter::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
: pool_(pool),
data_buffer_(pool),
writer_(std::move(writer)),
row_group_writer_(nullptr),
arrow_properties_(arrow_properties),
closed_(false) {}
Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
if (row_group_writer_ != nullptr) {
PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
}
PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup());
return Status::OK();
}
// ----------------------------------------------------------------------
// Column type specialization
template <typename ParquetType, typename ArrowType>
Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
const std::shared_ptr<Array>& array,
int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) {
using ArrowCType = typename ArrowType::c_type;
const auto& data = static_cast<const PrimitiveArray&>(*array);
auto data_ptr =
reinterpret_cast<const ArrowCType*>(data.values()->data()) + data.offset();
auto writer = reinterpret_cast<TypedColumnWriter<ParquetType>*>(column_writer);
if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
// no nulls, just dump the data
RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(
writer, static_cast<const ArrowType&>(*array->type()), array->length(),
num_levels, def_levels, rep_levels, data_ptr)));
} else {
const uint8_t* valid_bits = data.null_bitmap_data();
RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(
writer, static_cast<const ArrowType&>(*array->type()), data.length(), num_levels,
def_levels, rep_levels, valid_bits, data.offset(), data_ptr)));
}
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
template <typename ParquetType, typename ArrowType>
Status FileWriter::Impl::WriteNonNullableBatch(
TypedColumnWriter<ParquetType>* writer, const ArrowType& type, int64_t num_values,
int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
const typename ArrowType::c_type* data_ptr) {
using ParquetCType = typename ParquetType::c_type;
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
std::copy(data_ptr, data_ptr + num_values, buffer_ptr);
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
return Status::OK();
}
template <>
Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>(
TypedColumnWriter<Int32Type>* writer, const ::arrow::Date64Type& type,
int64_t num_values, int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels, const int64_t* data_ptr) {
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
for (int i = 0; i < num_values; i++) {
buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000);
}
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
return Status::OK();
}
template <>
Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
TypedColumnWriter<Int32Type>* writer, const ::arrow::Time32Type& type,
int64_t num_values, int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels, const int32_t* data_ptr) {
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
if (type.unit() == TimeUnit::SECOND) {
for (int i = 0; i < num_values; i++) {
buffer_ptr[i] = data_ptr[i] * 1000;
}
} else {
std::copy(data_ptr, data_ptr + num_values, buffer_ptr);
}
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
return Status::OK();
}
#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
template <> \
Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>( \
TypedColumnWriter<ParquetType> * writer, const ArrowType& type, \
int64_t num_values, int64_t num_levels, const int16_t* def_levels, \
const int16_t* rep_levels, const CType* data_ptr) { \
PARQUET_CATCH_NOT_OK( \
writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \
return Status::OK(); \
}
NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
template <typename ParquetType, typename ArrowType>
Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer,
const ArrowType& type, int64_t num_values,
int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels,
const uint8_t* valid_bits,
int64_t valid_bits_offset,
const typename ArrowType::c_type* data_ptr) {
using ParquetCType = typename ParquetType::c_type;
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
num_values);
for (int i = 0; i < num_values; i++) {
if (valid_bits_reader.IsSet()) {
buffer_ptr[i] = static_cast<ParquetCType>(data_ptr[i]);
}
valid_bits_reader.Next();
}
PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
return Status::OK();
}
template <>
Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
TypedColumnWriter<Int32Type>* writer, const ::arrow::Date64Type& type,
int64_t num_values, int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
const int64_t* data_ptr) {
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
num_values);
for (int i = 0; i < num_values; i++) {
if (valid_bits_reader.IsSet()) {
// Convert from milliseconds into days since the epoch
buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000);
}
valid_bits_reader.Next();
}
PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
return Status::OK();
}
template <>
Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
TypedColumnWriter<Int32Type>* writer, const ::arrow::Time32Type& type,
int64_t num_values, int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
const int32_t* data_ptr) {
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
num_values);
if (type.unit() == TimeUnit::SECOND) {
for (int i = 0; i < num_values; i++) {
if (valid_bits_reader.IsSet()) {
buffer_ptr[i] = data_ptr[i] * 1000;
}
valid_bits_reader.Next();
}
} else {
for (int i = 0; i < num_values; i++) {
if (valid_bits_reader.IsSet()) {
buffer_ptr[i] = data_ptr[i];
}
valid_bits_reader.Next();
}
}
PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
return Status::OK();
}
#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
template <> \
Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>( \
TypedColumnWriter<ParquetType> * writer, const ArrowType& type, \
int64_t num_values, int64_t num_levels, const int16_t* def_levels, \
const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, \
const CType* data_ptr) { \
PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced( \
num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \
return Status::OK(); \
}
NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
// ----------------------------------------------------------------------
// Write timestamps
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
template <>
Status FileWriter::Impl::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
int64_t num_values, int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
const int64_t* data_ptr) {
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
num_values);
if (type.unit() == TimeUnit::NANO) {
for (int i = 0; i < num_values; i++) {
if (valid_bits_reader.IsSet()) {
internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
}
valid_bits_reader.Next();
}
} else {
return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
}
PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
return Status::OK();
}
template <>
Status FileWriter::Impl::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
int64_t num_values, int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels, const int64_t* data_ptr) {
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
if (type.unit() == TimeUnit::NANO) {
for (int i = 0; i < num_values; i++) {
internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
}
} else {
return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
}
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
return Status::OK();
}
Status FileWriter::Impl::WriteTimestamps(ColumnWriter* column_writer,
const std::shared_ptr<Array>& values,
int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) {
const auto& type = static_cast<::arrow::TimestampType&>(*values->type());
const bool is_nanosecond = type.unit() == TimeUnit::NANO;
if (is_nanosecond && arrow_properties_->support_deprecated_int96_timestamps()) {
return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(
column_writer, values, num_levels, def_levels, rep_levels);
} else if (is_nanosecond ||
(arrow_properties_->coerce_timestamps_enabled() &&
(type.unit() != arrow_properties_->coerce_timestamps_unit()))) {
// Casting is required. This covers several cases
// * Nanoseconds -> cast to microseconds
// * coerce_timestamps_enabled_, cast all timestamps to requested unit
return WriteTimestampsCoerce(column_writer, values, num_levels, def_levels,
rep_levels);
} else {
// No casting of timestamps is required, take the fast path
return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(
column_writer, values, num_levels, def_levels, rep_levels);
}
}
Status FileWriter::Impl::WriteTimestampsCoerce(ColumnWriter* column_writer,
const std::shared_ptr<Array>& array,
int64_t num_levels,
const int16_t* def_levels,
const int16_t* rep_levels) {
// Note that we can only use data_buffer_ here as we write timestamps with the fast
// path.
RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(int64_t)));
int64_t* data_buffer_ptr = reinterpret_cast<int64_t*>(data_buffer_.mutable_data());
const auto& data = static_cast<const ::arrow::TimestampArray&>(*array);
auto data_ptr = data.raw_values();
auto writer = reinterpret_cast<TypedColumnWriter<Int64Type>*>(column_writer);
const auto& type = static_cast<const ::arrow::TimestampType&>(*array->type());
TimeUnit::type target_unit = arrow_properties_->coerce_timestamps_enabled()
? arrow_properties_->coerce_timestamps_unit()
: TimeUnit::MICRO;
auto target_type = ::arrow::timestamp(target_unit);
auto DivideBy = [&](const int64_t factor) {
for (int64_t i = 0; i < array->length(); i++) {
if (!data.IsNull(i) && (data_ptr[i] % factor != 0)) {
std::stringstream ss;
ss << "Casting from " << type.ToString() << " to " << target_type->ToString()
<< " would lose data: " << data_ptr[i];
return Status::Invalid(ss.str());
}
data_buffer_ptr[i] = data_ptr[i] / factor;
}
return Status::OK();
};
auto MultiplyBy = [&](const int64_t factor) {
for (int64_t i = 0; i < array->length(); i++) {
data_buffer_ptr[i] = data_ptr[i] * factor;
}
return Status::OK();
};
if (type.unit() == TimeUnit::NANO) {
if (target_unit == TimeUnit::MICRO) {
RETURN_NOT_OK(DivideBy(1000));
} else {
DCHECK_EQ(TimeUnit::MILLI, target_unit);
RETURN_NOT_OK(DivideBy(1000000));
}
} else if (type.unit() == TimeUnit::SECOND) {
RETURN_NOT_OK(MultiplyBy(target_unit == TimeUnit::MICRO ? 1000000 : 1000));
} else if (type.unit() == TimeUnit::MILLI) {
DCHECK_EQ(TimeUnit::MICRO, target_unit);
RETURN_NOT_OK(MultiplyBy(1000));
} else {
DCHECK_EQ(TimeUnit::MILLI, target_unit);
RETURN_NOT_OK(DivideBy(1000));
}
if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
// no nulls, just dump the data
RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(
writer, static_cast<const ::arrow::TimestampType&>(*target_type), array->length(),
num_levels, def_levels, rep_levels, data_buffer_ptr)));
} else {
const uint8_t* valid_bits = data.null_bitmap_data();
RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(
writer, static_cast<const ::arrow::TimestampType&>(*target_type), array->length(),
num_levels, def_levels, rep_levels, valid_bits, data.offset(), data_buffer_ptr)));
}
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
// ----------------------------------------------------------------------
// This specialization seems quite similar but it significantly differs in two points:
// * offset is added at the most latest time to the pointer as we have sub-byte access
// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
// ArrowType::c_type to ParquetType::c_type
template <>
Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels) {
RETURN_NOT_OK(data_buffer_.Resize(array->length()));
auto data = static_cast<const BooleanArray*>(array.get());
auto data_ptr = reinterpret_cast<const uint8_t*>(data->values()->data());
auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data());
auto writer = reinterpret_cast<TypedColumnWriter<BooleanType>*>(column_writer);
int buffer_idx = 0;
int64_t offset = array->offset();
for (int i = 0; i < data->length(); i++) {
if (!data->IsNull(i)) {
buffer_ptr[buffer_idx++] = BitUtil::GetBit(data_ptr, offset + i);
}
}
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
template <>
Status FileWriter::Impl::TypedWriteBatch<Int32Type, ::arrow::NullType>(
ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels) {
auto writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer);
PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, nullptr));
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
template <>
Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels) {
RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(ByteArray)));
auto data = static_cast<const BinaryArray*>(array.get());
auto buffer_ptr = reinterpret_cast<ByteArray*>(data_buffer_.mutable_data());
// In the case of an array consisting of only empty strings or all null,
// data->data() points already to a nullptr, thus data->data()->data() will
// segfault.
const uint8_t* data_ptr = nullptr;
if (data->value_data()) {
data_ptr = reinterpret_cast<const uint8_t*>(data->value_data()->data());
DCHECK(data_ptr != nullptr);
}
auto writer = reinterpret_cast<TypedColumnWriter<ByteArrayType>*>(column_writer);
// Slice offset is accounted for in raw_value_offsets
const int32_t* value_offset = data->raw_value_offsets();
if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
// no nulls, just dump the data
for (int64_t i = 0; i < data->length(); i++) {
buffer_ptr[i] =
ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]);
}
} else {
int buffer_idx = 0;
for (int64_t i = 0; i < data->length(); i++) {
if (!data->IsNull(i)) {
buffer_ptr[buffer_idx++] =
ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]);
}
}
}
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
template <>
Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>(
ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels) {
RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(FLBA), false));
const auto& data = static_cast<const FixedSizeBinaryArray&>(*array);
const int64_t length = data.length();
auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
if (writer->descr()->schema_node()->is_required() || data.null_count() == 0) {
// no nulls, just dump the data
// todo(advancedxy): use a writeBatch to avoid this step
for (int64_t i = 0; i < length; i++) {
buffer_ptr[i] = FixedLenByteArray(data.GetValue(i));
}
} else {
int buffer_idx = 0;
for (int64_t i = 0; i < length; i++) {
if (!data.IsNull(i)) {
buffer_ptr[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
}
}
}
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
template <>
Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels) {
const auto& data = static_cast<const Decimal128Array&>(*array);
const int64_t length = data.length();
// TODO(phillipc): This is potentially very wasteful if we have a lot of nulls
std::vector<uint64_t> big_endian_values(static_cast<size_t>(length) * 2);
RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(FLBA), false));
auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*data.type());
const int32_t offset =
decimal_type.byte_width() - DecimalSize(decimal_type.precision());
const bool does_not_have_nulls =
writer->descr()->schema_node()->is_required() || data.null_count() == 0;
// TODO(phillipc): Look into whether our compilers will perform loop unswitching so we
// don't have to keep writing two loops to handle the case where we know there are no
// nulls
if (does_not_have_nulls) {
// no nulls, just dump the data
// todo(advancedxy): use a writeBatch to avoid this step
for (int64_t i = 0, j = 0; i < length; ++i, j += 2) {
auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
buffer_ptr[i] = FixedLenByteArray(
reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
}
} else {
for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) {
if (!data.IsNull(i)) {
auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
buffer_ptr[buffer_idx++] = FixedLenByteArray(
reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
j += 2;
}
}
}
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
// End of column type specializations
// ----------------------------------------------------------------------
Status FileWriter::Impl::Close() {
if (!closed_) {
// Make idempotent
closed_ = true;
if (row_group_writer_ != nullptr) {
PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
}
PARQUET_CATCH_NOT_OK(writer_->Close());
}
return Status::OK();
}
Status FileWriter::NewRowGroup(int64_t chunk_size) {
return impl_->NewRowGroup(chunk_size);
}
Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
// DictionaryArrays are not yet handled with a fast path. To still support
// writing them as a workaround, we convert them back to their non-dictionary
// representation.
if (data.type()->id() == ::arrow::Type::DICTIONARY) {
const ::arrow::DictionaryType& dict_type =
static_cast<const ::arrow::DictionaryType&>(*data.type());
// TODO(ARROW-1648): Remove this special handling once we require an Arrow
// version that has this fixed.
if (dict_type.dictionary()->type()->id() == ::arrow::Type::NA) {
return WriteColumnChunk(::arrow::NullArray(data.length()));
}
FunctionContext ctx(pool_);
std::shared_ptr<Array> plain_array;
RETURN_NOT_OK(
Cast(&ctx, data, dict_type.dictionary()->type(), CastOptions(), &plain_array));
return WriteColumnChunk(*plain_array);
}
ColumnWriter* column_writer;
PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
int current_column_idx = row_group_writer_->current_column();
std::shared_ptr<::arrow::Schema> arrow_schema;
RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1},
writer_->key_value_metadata(), &arrow_schema));
std::shared_ptr<Buffer> def_levels_buffer;
std::shared_ptr<Buffer> rep_levels_buffer;
int64_t values_offset;
::arrow::Type::type values_type;
int64_t num_levels;
int64_t num_values;
std::shared_ptr<Array> _values_array;
LevelBuilder level_builder(pool_);
RETURN_NOT_OK(level_builder.GenerateLevels(
data, arrow_schema->field(0), &values_offset, &values_type, &num_values,
&num_levels, &def_levels_buffer, &rep_levels_buffer, &_values_array));
const int16_t* def_levels = nullptr;
if (def_levels_buffer) {
def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
}
const int16_t* rep_levels = nullptr;
if (rep_levels_buffer) {
rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data());
}
std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values);
#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType) \
case ::arrow::Type::ArrowEnum: \
return TypedWriteBatch<ParquetType, ::arrow::ArrowType>( \
column_writer, values_array, num_levels, def_levels, rep_levels);
switch (values_type) {
case ::arrow::Type::UINT32: {
if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
// Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
// to use the larger Int64Type to store them lossless.
return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(
column_writer, values_array, num_levels, def_levels, rep_levels);
} else {
return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(
column_writer, values_array, num_levels, def_levels, rep_levels);
}
}
WRITE_BATCH_CASE(NA, NullType, Int32Type)
case ::arrow::Type::TIMESTAMP:
return WriteTimestamps(column_writer, values_array, num_levels, def_levels,
rep_levels);
WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
WRITE_BATCH_CASE(INT16, Int16Type, Int32Type)
WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type)
WRITE_BATCH_CASE(INT32, Int32Type, Int32Type)
WRITE_BATCH_CASE(INT64, Int64Type, Int64Type)
WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type)
WRITE_BATCH_CASE(FLOAT, FloatType, FloatType)
WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType)
WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
WRITE_BATCH_CASE(DECIMAL, Decimal128Type, FLBAType)
WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type)
WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type)
default:
break;
}
PARQUET_CATCH_NOT_OK(column_writer->Close());
std::stringstream ss;
ss << "Data type not supported as list value: " << values_array->type()->ToString();
return Status::NotImplemented(ss.str());
}
Status FileWriter::WriteColumnChunk(const ::arrow::Array& array) {
return impl_->WriteColumnChunk(array);
}
Status FileWriter::Close() { return impl_->Close(); }
MemoryPool* FileWriter::memory_pool() const { return impl_->pool_; }
FileWriter::~FileWriter() {}
FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
: impl_(new FileWriter::Impl(pool, std::move(writer), arrow_properties)) {}
Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
const std::shared_ptr<OutputStream>& sink,
const std::shared_ptr<WriterProperties>& properties,
std::unique_ptr<FileWriter>* writer) {
return Open(schema, pool, sink, properties, default_arrow_writer_properties(), writer);
}
Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
const std::shared_ptr<OutputStream>& sink,
const std::shared_ptr<WriterProperties>& properties,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::unique_ptr<FileWriter>* writer) {
std::shared_ptr<SchemaDescriptor> parquet_schema;
RETURN_NOT_OK(
ToParquetSchema(&schema, *properties, *arrow_properties, &parquet_schema));
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
std::unique_ptr<ParquetFileWriter> base_writer =
ParquetFileWriter::Open(sink, schema_node, properties, schema.metadata());
writer->reset(new FileWriter(pool, std::move(base_writer), arrow_properties));
return Status::OK();
}
Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink,
const std::shared_ptr<WriterProperties>& properties,
std::unique_ptr<FileWriter>* writer) {
auto wrapper = std::make_shared<ArrowOutputStream>(sink);
return Open(schema, pool, wrapper, properties, writer);
}
Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink,
const std::shared_ptr<WriterProperties>& properties,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::unique_ptr<FileWriter>* writer) {
auto wrapper = std::make_shared<ArrowOutputStream>(sink);
return Open(schema, pool, wrapper, properties, arrow_properties, writer);
}
Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
// TODO(ARROW-232) Support writing chunked arrays.
for (int i = 0; i < table.num_columns(); i++) {
if (table.column(i)->data()->num_chunks() != 1) {
return Status::NotImplemented("No support for writing chunked arrays yet.");
}
}
if (chunk_size <= 0) {
return Status::Invalid("chunk size per row_group must be greater than 0");
} else if (chunk_size > impl_->properties().max_row_group_length()) {
chunk_size = impl_->properties().max_row_group_length();
}
for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
int64_t size = std::min(chunk_size, table.num_rows() - offset);
RETURN_NOT_OK_ELSE(NewRowGroup(size), PARQUET_IGNORE_NOT_OK(Close()));
for (int i = 0; i < table.num_columns(); i++) {
std::shared_ptr<Array> array = table.column(i)->data()->chunk(0);
array = array->Slice(offset, size);
RETURN_NOT_OK_ELSE(WriteColumnChunk(*array), PARQUET_IGNORE_NOT_OK(Close()));
}
}
return Status::OK();
}
Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
const std::shared_ptr<WriterProperties>& properties,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
std::unique_ptr<FileWriter> writer;
RETURN_NOT_OK(FileWriter::Open(*table.schema(), pool, sink, properties,
arrow_properties, &writer));
RETURN_NOT_OK(writer->WriteTable(table, chunk_size));
return writer->Close();
}
Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink,
int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
auto wrapper = std::make_shared<ArrowOutputStream>(sink);
return WriteTable(table, pool, wrapper, chunk_size, properties, arrow_properties);
}
} // namespace arrow
} // namespace parquet