blob: 1410a5f89e2de080d0684d82934b32b4338bf233 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/arrow/reader_internal.h"
#include <algorithm>
#include <climits>
#include <cstdint>
#include <cstring>
#include <memory>
#include <string>
#include <type_traits>
#include <vector>
#include "arrow/array.h"
#include "arrow/compute/api.h"
#include "arrow/datum.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/base64.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/endian.h"
#include "arrow/util/int_util_internal.h"
#include "arrow/util/logging.h"
#include "arrow/util/string_view.h"
#include "arrow/util/ubsan.h"
#include "arrow/visitor_inline.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/schema_internal.h"
#include "parquet/column_reader.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/statistics.h"
#include "parquet/types.h"
// Required after "arrow/util/int_util_internal.h" (for OPTIONAL)
#include "parquet/windows_compatibility.h"
using arrow::Array;
using arrow::BooleanArray;
using arrow::ChunkedArray;
using arrow::DataType;
using arrow::Datum;
using arrow::Decimal128;
using arrow::Decimal128Array;
using arrow::Decimal128Type;
using arrow::Decimal256;
using arrow::Decimal256Array;
using arrow::Decimal256Type;
using arrow::Field;
using arrow::Int32Array;
using arrow::ListArray;
using arrow::MemoryPool;
using arrow::ResizableBuffer;
using arrow::Status;
using arrow::StructArray;
using arrow::Table;
using arrow::TimestampArray;
using ::arrow::BitUtil::FromBigEndian;
using ::arrow::internal::checked_cast;
using ::arrow::internal::checked_pointer_cast;
using ::arrow::internal::SafeLeftShift;
using ::arrow::util::SafeLoadAs;
using parquet::internal::BinaryRecordReader;
using parquet::internal::DictionaryRecordReader;
using parquet::internal::RecordReader;
using parquet::schema::GroupNode;
using parquet::schema::Node;
using parquet::schema::PrimitiveNode;
using ParquetType = parquet::Type;
namespace BitUtil = arrow::BitUtil;
namespace parquet {
namespace arrow {
namespace {
template <typename ArrowType>
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
template <typename CType, typename StatisticsType>
Status MakeMinMaxScalar(const StatisticsType& statistics,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
*min = ::arrow::MakeScalar(static_cast<CType>(statistics.min()));
*max = ::arrow::MakeScalar(static_cast<CType>(statistics.max()));
return Status::OK();
}
template <typename CType, typename StatisticsType>
Status MakeMinMaxTypedScalar(const StatisticsType& statistics,
std::shared_ptr<DataType> type,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
ARROW_ASSIGN_OR_RAISE(*min, ::arrow::MakeScalar(type, statistics.min()));
ARROW_ASSIGN_OR_RAISE(*max, ::arrow::MakeScalar(type, statistics.max()));
return Status::OK();
}
template <typename StatisticsType>
Status MakeMinMaxIntegralScalar(const StatisticsType& statistics,
const ::arrow::DataType& arrow_type,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
const auto column_desc = statistics.descr();
const auto& logical_type = column_desc->logical_type();
const auto& integer = checked_pointer_cast<const IntLogicalType>(logical_type);
const bool is_signed = integer->is_signed();
switch (integer->bit_width()) {
case 8:
return is_signed ? MakeMinMaxScalar<int8_t>(statistics, min, max)
: MakeMinMaxScalar<uint8_t>(statistics, min, max);
case 16:
return is_signed ? MakeMinMaxScalar<int16_t>(statistics, min, max)
: MakeMinMaxScalar<uint16_t>(statistics, min, max);
case 32:
return is_signed ? MakeMinMaxScalar<int32_t>(statistics, min, max)
: MakeMinMaxScalar<uint32_t>(statistics, min, max);
case 64:
return is_signed ? MakeMinMaxScalar<int64_t>(statistics, min, max)
: MakeMinMaxScalar<uint64_t>(statistics, min, max);
}
return Status::OK();
}
static Status FromInt32Statistics(const Int32Statistics& statistics,
const LogicalType& logical_type,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
ARROW_ASSIGN_OR_RAISE(auto type, FromInt32(logical_type));
switch (logical_type.type()) {
case LogicalType::Type::INT:
return MakeMinMaxIntegralScalar(statistics, *type, min, max);
break;
case LogicalType::Type::DATE:
case LogicalType::Type::TIME:
case LogicalType::Type::NONE:
return MakeMinMaxTypedScalar<int32_t>(statistics, type, min, max);
break;
default:
break;
}
return Status::NotImplemented("Cannot extract statistics for type ");
}
static Status FromInt64Statistics(const Int64Statistics& statistics,
const LogicalType& logical_type,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
ARROW_ASSIGN_OR_RAISE(auto type, FromInt64(logical_type));
switch (logical_type.type()) {
case LogicalType::Type::INT:
return MakeMinMaxIntegralScalar(statistics, *type, min, max);
break;
case LogicalType::Type::TIME:
case LogicalType::Type::TIMESTAMP:
case LogicalType::Type::NONE:
return MakeMinMaxTypedScalar<int64_t>(statistics, type, min, max);
break;
default:
break;
}
return Status::NotImplemented("Cannot extract statistics for type ");
}
template <typename DecimalType>
Result<std::shared_ptr<::arrow::Scalar>> FromBigEndianString(
const std::string& data, std::shared_ptr<DataType> arrow_type) {
ARROW_ASSIGN_OR_RAISE(
DecimalType decimal,
DecimalType::FromBigEndian(reinterpret_cast<const uint8_t*>(data.data()),
static_cast<int32_t>(data.size())));
return ::arrow::MakeScalar(std::move(arrow_type), decimal);
}
// Extracts Min and Max scalar from bytes like types (i.e. types where
// decimal is encoded as little endian.
Status ExtractDecimalMinMaxFromBytesType(const Statistics& statistics,
const LogicalType& logical_type,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
const DecimalLogicalType& decimal_type =
checked_cast<const DecimalLogicalType&>(logical_type);
Result<std::shared_ptr<DataType>> maybe_type =
Decimal128Type::Make(decimal_type.precision(), decimal_type.scale());
std::shared_ptr<DataType> arrow_type;
if (maybe_type.ok()) {
arrow_type = maybe_type.ValueOrDie();
ARROW_ASSIGN_OR_RAISE(
*min, FromBigEndianString<Decimal128>(statistics.EncodeMin(), arrow_type));
ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString<Decimal128>(statistics.EncodeMax(),
std::move(arrow_type)));
return Status::OK();
}
// Fallback to see if Decimal256 can represent the type.
ARROW_ASSIGN_OR_RAISE(
arrow_type, Decimal256Type::Make(decimal_type.precision(), decimal_type.scale()));
ARROW_ASSIGN_OR_RAISE(
*min, FromBigEndianString<Decimal256>(statistics.EncodeMin(), arrow_type));
ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString<Decimal256>(statistics.EncodeMax(),
std::move(arrow_type)));
return Status::OK();
}
Status ByteArrayStatisticsAsScalars(const Statistics& statistics,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
auto logical_type = statistics.descr()->logical_type();
if (logical_type->type() == LogicalType::Type::DECIMAL) {
return ExtractDecimalMinMaxFromBytesType(statistics, *logical_type, min, max);
}
std::shared_ptr<::arrow::DataType> type;
if (statistics.descr()->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
type = ::arrow::fixed_size_binary(statistics.descr()->type_length());
} else {
type = logical_type->type() == LogicalType::Type::STRING ? ::arrow::utf8()
: ::arrow::binary();
}
ARROW_ASSIGN_OR_RAISE(
*min, ::arrow::MakeScalar(type, Buffer::FromString(statistics.EncodeMin())));
ARROW_ASSIGN_OR_RAISE(
*max, ::arrow::MakeScalar(type, Buffer::FromString(statistics.EncodeMax())));
return Status::OK();
}
} // namespace
Status StatisticsAsScalars(const Statistics& statistics,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
if (!statistics.HasMinMax()) {
return Status::Invalid("Statistics has no min max.");
}
auto column_desc = statistics.descr();
if (column_desc == nullptr) {
return Status::Invalid("Statistics carries no descriptor, can't infer arrow type.");
}
auto physical_type = column_desc->physical_type();
auto logical_type = column_desc->logical_type();
switch (physical_type) {
case Type::BOOLEAN:
return MakeMinMaxScalar<bool, BoolStatistics>(
checked_cast<const BoolStatistics&>(statistics), min, max);
case Type::FLOAT:
return MakeMinMaxScalar<float, FloatStatistics>(
checked_cast<const FloatStatistics&>(statistics), min, max);
case Type::DOUBLE:
return MakeMinMaxScalar<double, DoubleStatistics>(
checked_cast<const DoubleStatistics&>(statistics), min, max);
case Type::INT32:
return FromInt32Statistics(checked_cast<const Int32Statistics&>(statistics),
*logical_type, min, max);
case Type::INT64:
return FromInt64Statistics(checked_cast<const Int64Statistics&>(statistics),
*logical_type, min, max);
case Type::BYTE_ARRAY:
case Type::FIXED_LEN_BYTE_ARRAY:
return ByteArrayStatisticsAsScalars(statistics, min, max);
default:
return Status::NotImplemented("Extract statistics unsupported for physical_type ",
physical_type, " unsupported.");
}
return Status::OK();
}
// ----------------------------------------------------------------------
// Primitive types
namespace {
template <typename ArrowType, typename ParquetType>
Status TransferInt(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<DataType>& type, Datum* out) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
int64_t length = reader->values_written();
ARROW_ASSIGN_OR_RAISE(auto data,
::arrow::AllocateBuffer(length * sizeof(ArrowCType), pool));
auto values = reinterpret_cast<const ParquetCType*>(reader->values());
auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
std::copy(values, values + length, out_ptr);
*out = std::make_shared<ArrayType<ArrowType>>(
type, length, std::move(data), reader->ReleaseIsValid(), reader->null_count());
return Status::OK();
}
std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
const std::shared_ptr<DataType>& type) {
std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
reader->ReleaseValues()};
auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(),
buffers, reader->null_count());
return ::arrow::MakeArray(data);
}
Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) {
int64_t length = reader->values_written();
const int64_t buffer_size = BitUtil::BytesForBits(length);
ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(buffer_size, pool));
// Transfer boolean values to packed bitmap
auto values = reinterpret_cast<const bool*>(reader->values());
uint8_t* data_ptr = data->mutable_data();
memset(data_ptr, 0, buffer_size);
for (int64_t i = 0; i < length; i++) {
if (values[i]) {
::arrow::BitUtil::SetBit(data_ptr, i);
}
}
*out = std::make_shared<BooleanArray>(length, std::move(data), reader->ReleaseIsValid(),
reader->null_count());
return Status::OK();
}
Status TransferInt96(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<DataType>& type, Datum* out) {
int64_t length = reader->values_written();
auto values = reinterpret_cast<const Int96*>(reader->values());
ARROW_ASSIGN_OR_RAISE(auto data,
::arrow::AllocateBuffer(length * sizeof(int64_t), pool));
auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
for (int64_t i = 0; i < length; i++) {
if (values[i].value[2] == 0) {
// Happens for null entries: avoid triggering UBSAN as that Int96 timestamp
// isn't representable as a 64-bit Unix timestamp.
*data_ptr++ = 0;
} else {
*data_ptr++ = Int96GetNanoSeconds(values[i]);
}
}
*out = std::make_shared<TimestampArray>(type, length, std::move(data),
reader->ReleaseIsValid(), reader->null_count());
return Status::OK();
}
Status TransferDate64(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<DataType>& type, Datum* out) {
int64_t length = reader->values_written();
auto values = reinterpret_cast<const int32_t*>(reader->values());
ARROW_ASSIGN_OR_RAISE(auto data,
::arrow::AllocateBuffer(length * sizeof(int64_t), pool));
auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
for (int64_t i = 0; i < length; i++) {
*out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
}
*out = std::make_shared<::arrow::Date64Array>(
type, length, std::move(data), reader->ReleaseIsValid(), reader->null_count());
return Status::OK();
}
// ----------------------------------------------------------------------
// Binary, direct to dictionary-encoded
Status TransferDictionary(RecordReader* reader,
const std::shared_ptr<DataType>& logical_value_type,
std::shared_ptr<ChunkedArray>* out) {
auto dict_reader = dynamic_cast<DictionaryRecordReader*>(reader);
DCHECK(dict_reader);
*out = dict_reader->GetResult();
if (!logical_value_type->Equals(*(*out)->type())) {
ARROW_ASSIGN_OR_RAISE(*out, (*out)->View(logical_value_type));
}
return Status::OK();
}
Status TransferBinary(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<DataType>& logical_value_type,
std::shared_ptr<ChunkedArray>* out) {
if (reader->read_dictionary()) {
return TransferDictionary(
reader, ::arrow::dictionary(::arrow::int32(), logical_value_type), out);
}
::arrow::compute::ExecContext ctx(pool);
::arrow::compute::CastOptions cast_options;
cast_options.allow_invalid_utf8 = false; // avoid spending time validating UTF8 data
auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader);
DCHECK(binary_reader);
auto chunks = binary_reader->GetBuilderChunks();
for (auto& chunk : chunks) {
if (!chunk->type()->Equals(*logical_value_type)) {
// XXX: if a LargeBinary chunk is larger than 2GB, the MSBs of offsets
// will be lost because they are first created as int32 and then cast to int64.
ARROW_ASSIGN_OR_RAISE(
chunk, ::arrow::compute::Cast(*chunk, logical_value_type, cast_options, &ctx));
}
}
*out = std::make_shared<ChunkedArray>(chunks, logical_value_type);
return Status::OK();
}
// ----------------------------------------------------------------------
// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128 || Decimal256
template <typename DecimalType>
Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
uint8_t* out_buf) {
ARROW_ASSIGN_OR_RAISE(DecimalType t, DecimalType::FromBigEndian(value, byte_width));
t.ToBytes(out_buf);
return ::arrow::Status::OK();
}
template <typename DecimalArrayType>
struct DecimalTypeTrait;
template <>
struct DecimalTypeTrait<::arrow::Decimal128Array> {
using value = ::arrow::Decimal128;
};
template <>
struct DecimalTypeTrait<::arrow::Decimal256Array> {
using value = ::arrow::Decimal256;
};
template <typename DecimalArrayType, typename ParquetType>
struct DecimalConverter {
static inline Status ConvertToDecimal(const Array& array,
const std::shared_ptr<DataType>&,
MemoryPool* pool, std::shared_ptr<Array>*) {
return Status::NotImplemented("not implemented");
}
};
template <typename DecimalArrayType>
struct DecimalConverter<DecimalArrayType, FLBAType> {
static inline Status ConvertToDecimal(const Array& array,
const std::shared_ptr<DataType>& type,
MemoryPool* pool, std::shared_ptr<Array>* out) {
const auto& fixed_size_binary_array =
checked_cast<const ::arrow::FixedSizeBinaryArray&>(array);
// The byte width of each decimal value
const int32_t type_length =
checked_cast<const ::arrow::DecimalType&>(*type).byte_width();
// number of elements in the entire array
const int64_t length = fixed_size_binary_array.length();
// Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
// this will be different from the decimal array width because we write the minimum
// number of bytes necessary to represent a given precision
const int32_t byte_width =
checked_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
.byte_width();
// allocate memory for the decimal array
ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
// raw bytes that we can write to
uint8_t* out_ptr = data->mutable_data();
// convert each FixedSizeBinary value to valid decimal bytes
const int64_t null_count = fixed_size_binary_array.null_count();
using DecimalType = typename DecimalTypeTrait<DecimalArrayType>::value;
if (null_count > 0) {
for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
if (!fixed_size_binary_array.IsNull(i)) {
RETURN_NOT_OK(RawBytesToDecimalBytes<DecimalType>(
fixed_size_binary_array.GetValue(i), byte_width, out_ptr));
} else {
std::memset(out_ptr, 0, type_length);
}
}
} else {
for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
RETURN_NOT_OK(RawBytesToDecimalBytes<DecimalType>(
fixed_size_binary_array.GetValue(i), byte_width, out_ptr));
}
}
*out = std::make_shared<DecimalArrayType>(
type, length, std::move(data), fixed_size_binary_array.null_bitmap(), null_count);
return Status::OK();
}
};
template <typename DecimalArrayType>
struct DecimalConverter<DecimalArrayType, ByteArrayType> {
static inline Status ConvertToDecimal(const Array& array,
const std::shared_ptr<DataType>& type,
MemoryPool* pool, std::shared_ptr<Array>* out) {
const auto& binary_array = checked_cast<const ::arrow::BinaryArray&>(array);
const int64_t length = binary_array.length();
const auto& decimal_type = checked_cast<const ::arrow::DecimalType&>(*type);
const int64_t type_length = decimal_type.byte_width();
ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
// raw bytes that we can write to
uint8_t* out_ptr = data->mutable_data();
const int64_t null_count = binary_array.null_count();
// convert each BinaryArray value to valid decimal bytes
for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
int32_t record_len = 0;
const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
if (record_len < 0 || record_len > type_length) {
return Status::Invalid("Invalid BYTE_ARRAY length for ", type->ToString());
}
auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
out_ptr_view[0] = 0;
out_ptr_view[1] = 0;
// only convert rows that are not null if there are nulls, or
// all rows, if there are not
if ((null_count > 0 && !binary_array.IsNull(i)) || null_count <= 0) {
using DecimalType = typename DecimalTypeTrait<DecimalArrayType>::value;
RETURN_NOT_OK(
RawBytesToDecimalBytes<DecimalType>(record_loc, record_len, out_ptr));
}
}
*out = std::make_shared<DecimalArrayType>(type, length, std::move(data),
binary_array.null_bitmap(), null_count);
return Status::OK();
}
};
/// \brief Convert an Int32 or Int64 array into a Decimal128Array
/// The parquet spec allows systems to write decimals in int32, int64 if the values are
/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
template <
typename ParquetIntegerType,
typename = ::arrow::enable_if_t<std::is_same<ParquetIntegerType, Int32Type>::value ||
std::is_same<ParquetIntegerType, Int64Type>::value>>
static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<DataType>& type, Datum* out) {
// Decimal128 and Decimal256 are only Arrow constructs. Parquet does not
// specifically distinguish between decimal byte widths.
// Decimal256 isn't relevant here because the Arrow-Parquet C++ bindings never
// write Decimal values as integers and if the decimal value can fit in an
// integer it is wasteful to use Decimal256. Put another way, the only
// way an integer column could be construed as Decimal256 is if an arrow
// schema was stored as metadata in the file indicating the column was
// Decimal256. The current Arrow-Parquet C++ bindings will never do this.
DCHECK(type->id() == ::arrow::Type::DECIMAL128);
const int64_t length = reader->values_written();
using ElementType = typename ParquetIntegerType::c_type;
static_assert(std::is_same<ElementType, int32_t>::value ||
std::is_same<ElementType, int64_t>::value,
"ElementType must be int32_t or int64_t");
const auto values = reinterpret_cast<const ElementType*>(reader->values());
const auto& decimal_type = checked_cast<const ::arrow::DecimalType&>(*type);
const int64_t type_length = decimal_type.byte_width();
ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
uint8_t* out_ptr = data->mutable_data();
using ::arrow::BitUtil::FromLittleEndian;
for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
// sign/zero extend int32_t values, otherwise a no-op
const auto value = static_cast<int64_t>(values[i]);
::arrow::Decimal128 decimal(value);
decimal.ToBytes(out_ptr);
}
if (reader->nullable_values()) {
std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
*out = std::make_shared<Decimal128Array>(type, length, std::move(data), is_valid,
reader->null_count());
} else {
*out = std::make_shared<Decimal128Array>(type, length, std::move(data));
}
return Status::OK();
}
/// \brief Convert an arrow::BinaryArray to an arrow::Decimal{128,256}Array
/// We do this by:
/// 1. Creating an arrow::BinaryArray from the RecordReader's builder
/// 2. Allocating a buffer for the arrow::Decimal{128,256}Array
/// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
/// representing the high and low bits of each decimal value.
template <typename DecimalArrayType, typename ParquetType>
Status TransferDecimal(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<DataType>& type, Datum* out) {
auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader);
DCHECK(binary_reader);
::arrow::ArrayVector chunks = binary_reader->GetBuilderChunks();
for (size_t i = 0; i < chunks.size(); ++i) {
std::shared_ptr<Array> chunk_as_decimal;
auto fn = &DecimalConverter<DecimalArrayType, ParquetType>::ConvertToDecimal;
RETURN_NOT_OK(fn(*chunks[i], type, pool, &chunk_as_decimal));
// Replace the chunk, which will hopefully also free memory as we go
chunks[i] = chunk_as_decimal;
}
*out = std::make_shared<ChunkedArray>(chunks, type);
return Status::OK();
}
} // namespace
#define TRANSFER_INT32(ENUM, ArrowType) \
case ::arrow::Type::ENUM: { \
Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_type, &result); \
RETURN_NOT_OK(s); \
} break;
#define TRANSFER_INT64(ENUM, ArrowType) \
case ::arrow::Type::ENUM: { \
Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_type, &result); \
RETURN_NOT_OK(s); \
} break;
Status TransferColumnData(RecordReader* reader, std::shared_ptr<DataType> value_type,
const ColumnDescriptor* descr, MemoryPool* pool,
std::shared_ptr<ChunkedArray>* out) {
Datum result;
std::shared_ptr<ChunkedArray> chunked_result;
switch (value_type->id()) {
case ::arrow::Type::DICTIONARY: {
RETURN_NOT_OK(TransferDictionary(reader, value_type, &chunked_result));
result = chunked_result;
} break;
case ::arrow::Type::NA: {
result = std::make_shared<::arrow::NullArray>(reader->values_written());
break;
}
case ::arrow::Type::INT32:
case ::arrow::Type::INT64:
case ::arrow::Type::FLOAT:
case ::arrow::Type::DOUBLE:
result = TransferZeroCopy(reader, value_type);
break;
case ::arrow::Type::BOOL:
RETURN_NOT_OK(TransferBool(reader, pool, &result));
break;
TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
TRANSFER_INT32(INT8, ::arrow::Int8Type);
TRANSFER_INT32(UINT16, ::arrow::UInt16Type);
TRANSFER_INT32(INT16, ::arrow::Int16Type);
TRANSFER_INT32(UINT32, ::arrow::UInt32Type);
TRANSFER_INT64(UINT64, ::arrow::UInt64Type);
TRANSFER_INT32(DATE32, ::arrow::Date32Type);
TRANSFER_INT32(TIME32, ::arrow::Time32Type);
TRANSFER_INT64(TIME64, ::arrow::Time64Type);
case ::arrow::Type::DATE64:
RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result));
break;
case ::arrow::Type::FIXED_SIZE_BINARY:
case ::arrow::Type::BINARY:
case ::arrow::Type::STRING:
case ::arrow::Type::LARGE_BINARY:
case ::arrow::Type::LARGE_STRING: {
RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result));
result = chunked_result;
} break;
case ::arrow::Type::DECIMAL128: {
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_type, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_type, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal128Array, ByteArrayType>;
RETURN_NOT_OK(fn(reader, pool, value_type, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal128Array, FLBAType>;
RETURN_NOT_OK(fn(reader, pool, value_type, &result));
} break;
default:
return Status::Invalid(
"Physical type for decimal128 must be int32, int64, byte array, or fixed "
"length binary");
}
} break;
case ::arrow::Type::DECIMAL256:
switch (descr->physical_type()) {
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
RETURN_NOT_OK(fn(reader, pool, value_type, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal256Array, FLBAType>;
RETURN_NOT_OK(fn(reader, pool, value_type, &result));
} break;
default:
return Status::Invalid(
"Physical type for decimal256 must be fixed length binary");
}
break;
case ::arrow::Type::TIMESTAMP: {
const ::arrow::TimestampType& timestamp_type =
checked_cast<::arrow::TimestampType&>(*value_type);
switch (timestamp_type.unit()) {
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO: {
result = TransferZeroCopy(reader, value_type);
} break;
case ::arrow::TimeUnit::NANO: {
if (descr->physical_type() == ::parquet::Type::INT96) {
RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result));
} else {
result = TransferZeroCopy(reader, value_type);
}
} break;
default:
return Status::NotImplemented("TimeUnit not supported");
}
} break;
default:
return Status::NotImplemented("No support for reading columns of type ",
value_type->ToString());
}
if (result.kind() == Datum::ARRAY) {
*out = std::make_shared<ChunkedArray>(result.make_array());
} else if (result.kind() == Datum::CHUNKED_ARRAY) {
*out = result.chunked_array();
} else {
DCHECK(false) << "Should be impossible, result was " << result.ToString();
}
return Status::OK();
}
} // namespace arrow
} // namespace parquet