PARQUET-1095: [C++] Read and write Arrow decimal values
This depends on:
- [x] [ARROW-1607](https://github.com/apache/arrow/pull/1128)
- [x] [ARROW-1656](https://github.com/apache/arrow/pull/1184)
- [x] [ARROW-1588](https://github.com/apache/arrow/pull/1211)
- [x] Add tests for writing different sizes of values
Author: Phillip Cloud <cpcloud@gmail.com>
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes #403 from cpcloud/PARQUET-1095 and squashes the following commits:
8c3d222 [Phillip Cloud] Remove loop from BytesToInteger
63018bc [Wes McKinney] Suppress C4996 due to arrow/util/variant.h
e4b02d3 [Phillip Cloud] Refactor types.h
83948ec [Phillip Cloud] Add last_value_ init
51965cd [Phillip Cloud] Min commit that contains the unique kernel in arrow
e25c59b [Phillip Cloud] Fix reader writer test for unique kernel addition
da0a7eb [Phillip Cloud] Update for ARROW-1811
16935de [Phillip Cloud] Reverse operand order and explicit cast
6036ca5 [Phillip Cloud] ARROW-1811
c5c4294 [Phillip Cloud] Fix issues
32a4abe [Phillip Cloud] Cleanup iteration a bit
920832a [Phillip Cloud] Update arrow version
9f97c1d [Phillip Cloud] Update for ARROW-1794: rename DecimalArray to Decimal128Array
b2e0290 [Phillip Cloud] IWYU
64748a8 [Phillip Cloud] Copy from arrow for now
6c9e2a7 [Phillip Cloud] Reduce the number of decimal test cases
7ab2e5c [Phillip Cloud] Parameterize on precision
30655d6 [Phillip Cloud] Use arrow random_decimals
9ff7eb4 [Phillip Cloud] Remove specific template parameters
1eee6a9 [Phillip Cloud] Remove specific randint call
8808e4c [Phillip Cloud] Bump arrow version
659fbc1 [Phillip Cloud] Fix deprecated API call
e162ca1 [Phillip Cloud] Allocate scratch space to hold the byteswapped values
5c9292b [Phillip Cloud] Proper dcheck call
1782da0 [Phillip Cloud] Use arrow
3d243d5 [Phillip Cloud] Checkpoint [ci skip]
028fb03 [Phillip Cloud] Remove garbage values
46dff15 [Phillip Cloud] Clean up uint32 test
613255e [Phillip Cloud] Do not use std::copy when reinterpret_cast will suffice
2917a62 [Phillip Cloud] PARQUET-1095: [C++] Read and write Arrow decimal values
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5aa8b93..c524ceb 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -460,6 +460,11 @@
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_CLANG_OPTIONS}")
endif()
+if ("${COMPILER_FAMILY}" STREQUAL "msvc")
+ # MSVC version of -Wno-deprecated
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4996")
+endif()
+
############################################################
# "make lint" target
############################################################
diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake
index a470fc1..fe1d499 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -366,7 +366,7 @@
-DARROW_BUILD_TESTS=OFF)
if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "")
- set(ARROW_VERSION "0e21f84c2fc26dba949a03ee7d7ebfade0a65b81") # Arrow 0.7.1
+ set(ARROW_VERSION "f2806fa518583907a129b2ecb0b7ec8758b69e17")
else()
set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}")
endif()
diff --git a/data/fixed_length_decimal.parquet b/data/fixed_length_decimal.parquet
new file mode 100644
index 0000000..69fce53
--- /dev/null
+++ b/data/fixed_length_decimal.parquet
Binary files differ
diff --git a/data/fixed_length_decimal_legacy.parquet b/data/fixed_length_decimal_legacy.parquet
new file mode 100644
index 0000000..b0df62a
--- /dev/null
+++ b/data/fixed_length_decimal_legacy.parquet
Binary files differ
diff --git a/data/int32_decimal.parquet b/data/int32_decimal.parquet
new file mode 100644
index 0000000..5bf2d4e
--- /dev/null
+++ b/data/int32_decimal.parquet
Binary files differ
diff --git a/data/int64_decimal.parquet b/data/int64_decimal.parquet
new file mode 100644
index 0000000..5043bca
--- /dev/null
+++ b/data/int64_decimal.parquet
Binary files differ
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index a18c565..0e0831e 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -24,6 +24,7 @@
#include "gtest/gtest.h"
#include <sstream>
+#include <arrow/compute/api.h>
#include "parquet/api/reader.h"
#include "parquet/api/writer.h"
@@ -37,13 +38,13 @@
#include "arrow/api.h"
#include "arrow/test-util.h"
+#include "arrow/util/decimal.h"
using arrow::Array;
using arrow::ArrayVisitor;
using arrow::Buffer;
using arrow::ChunkedArray;
using arrow::Column;
-using arrow::EncodeArrayToDictionary;
using arrow::ListArray;
using arrow::PoolBuffer;
using arrow::PrimitiveArray;
@@ -51,6 +52,9 @@
using arrow::Table;
using arrow::TimeUnit;
using arrow::default_memory_pool;
+using arrow::compute::DictionaryEncode;
+using arrow::compute::FunctionContext;
+using arrow::compute::Datum;
using arrow::io::BufferReader;
using arrow::test::randint;
@@ -68,10 +72,10 @@
namespace parquet {
namespace arrow {
-const int SMALL_SIZE = 100;
-const int LARGE_SIZE = 10000;
+static constexpr int SMALL_SIZE = 100;
+static constexpr int LARGE_SIZE = 10000;
-constexpr uint32_t kDefaultSeed = 0;
+static constexpr uint32_t kDefaultSeed = 0;
LogicalType::type get_logical_type(const ::arrow::DataType& type) {
switch (type.id()) {
@@ -118,6 +122,8 @@
static_cast<const ::arrow::DictionaryType&>(type);
return get_logical_type(*dict_type.dictionary()->type());
}
+ case ArrowId::DECIMAL:
+ return LogicalType::DECIMAL;
default:
break;
}
@@ -147,6 +153,7 @@
case ArrowId::STRING:
return ParquetType::BYTE_ARRAY;
case ArrowId::FIXED_SIZE_BINARY:
+ case ArrowId::DECIMAL:
return ParquetType::FIXED_LEN_BYTE_ARRAY;
case ArrowId::DATE32:
return ParquetType::INT32;
@@ -299,6 +306,7 @@
const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT
const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03"); // NOLINT
const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed"); // NOLINT
+
template <typename T>
using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
@@ -342,28 +350,44 @@
static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::arrow::DataType& type,
Repetition::type repetition) {
- int byte_width;
- // Decimal is not implemented yet.
+ int32_t byte_width = -1;
+ int32_t precision = -1;
+ int32_t scale = -1;
+
switch (type.id()) {
case ::arrow::Type::DICTIONARY: {
- const ::arrow::DictionaryType& dict_type =
- static_cast<const ::arrow::DictionaryType&>(type);
+ const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(type);
const ::arrow::DataType& values_type = *dict_type.dictionary()->type();
- if (values_type.id() == ::arrow::Type::FIXED_SIZE_BINARY) {
- byte_width =
- static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
- } else {
- byte_width = -1;
+ switch (values_type.id()) {
+ case ::arrow::Type::FIXED_SIZE_BINARY:
+ byte_width =
+ static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
+ break;
+ case ::arrow::Type::DECIMAL: {
+ const auto& decimal_type =
+ static_cast<const ::arrow::Decimal128Type&>(values_type);
+ precision = decimal_type.precision();
+ scale = decimal_type.scale();
+ byte_width = DecimalSize(precision);
+ } break;
+ default:
+ break;
}
} break;
case ::arrow::Type::FIXED_SIZE_BINARY:
byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width();
break;
+ case ::arrow::Type::DECIMAL: {
+ const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
+ precision = decimal_type.precision();
+ scale = decimal_type.scale();
+ byte_width = DecimalSize(precision);
+ } break;
default:
- byte_width = -1;
+ break;
}
auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type),
- get_logical_type(type), byte_width);
+ get_logical_type(type), byte_width, precision, scale);
NodePtr node_ =
GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
return std::static_pointer_cast<GroupNode>(node_);
@@ -371,7 +395,7 @@
namespace internal {
-void AssertArraysEqual(const Array &expected, const Array &actual) {
+void AssertArraysEqual(const Array& expected, const Array& actual) {
if (!actual.Equals(expected)) {
std::stringstream pp_result;
std::stringstream pp_expected;
@@ -526,11 +550,19 @@
// There we write an UInt32 Array but receive an Int64 Array as result for
// Parquet version 1.0.
-typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
- ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type,
- ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::Date32Type,
- ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType,
- ::arrow::BinaryType, ::arrow::FixedSizeBinaryType>
+typedef ::testing::Types<
+ ::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type,
+ ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type,
+ ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType,
+ ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, DecimalWithPrecisionAndScale<1>,
+ DecimalWithPrecisionAndScale<3>, DecimalWithPrecisionAndScale<5>,
+ DecimalWithPrecisionAndScale<7>, DecimalWithPrecisionAndScale<10>,
+ DecimalWithPrecisionAndScale<12>, DecimalWithPrecisionAndScale<15>,
+ DecimalWithPrecisionAndScale<17>, DecimalWithPrecisionAndScale<19>,
+ DecimalWithPrecisionAndScale<22>, DecimalWithPrecisionAndScale<23>,
+ DecimalWithPrecisionAndScale<24>, DecimalWithPrecisionAndScale<27>,
+ DecimalWithPrecisionAndScale<29>, DecimalWithPrecisionAndScale<32>,
+ DecimalWithPrecisionAndScale<34>, DecimalWithPrecisionAndScale<38>>
TestTypes;
TYPED_TEST_CASE(TestParquetIO, TestTypes);
@@ -590,8 +622,10 @@
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
- std::shared_ptr<Array> dict_values;
- ASSERT_OK(EncodeArrayToDictionary(*values, default_memory_pool(), &dict_values));
+ Datum out;
+ FunctionContext ctx(default_memory_pool());
+ ASSERT_OK(DictionaryEncode(&ctx, Datum(values), &out));
+ std::shared_ptr<Array> dict_values = MakeArray(out.array());
std::shared_ptr<GroupNode> schema =
MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL);
this->WriteColumn(schema, dict_values);
@@ -856,25 +890,43 @@
ASSERT_OK_NO_THROW(
WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties));
- std::shared_ptr<Array> expected_values;
std::shared_ptr<PoolBuffer> int64_data =
std::make_shared<PoolBuffer>(::arrow::default_memory_pool());
{
ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
- int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
- const uint32_t* uint32_data_ptr =
- reinterpret_cast<const uint32_t*>(values->values()->data());
- // std::copy might be faster but this is explicit on the casts)
- for (int64_t i = 0; i < values->length(); i++) {
- int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
- }
+ auto int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
+ auto uint32_data_ptr = reinterpret_cast<const uint32_t*>(values->values()->data());
+ const auto cast_uint32_to_int64 = [](uint32_t value) {
+ return static_cast<int64_t>(value);
+ };
+ std::transform(uint32_data_ptr, uint32_data_ptr + values->length(), int64_data_ptr,
+ cast_uint32_to_int64);
}
std::vector<std::shared_ptr<Buffer>> buffers{values->null_bitmap(), int64_data};
auto arr_data = std::make_shared<::arrow::ArrayData>(::arrow::int64(), values->length(),
buffers, values->null_count());
- ASSERT_OK(MakeArray(arr_data, &expected_values));
- this->ReadAndCheckSingleColumnTable(expected_values);
+ std::shared_ptr<Array> expected_values = MakeArray(arr_data);
+ ASSERT_NE(expected_values, NULLPTR);
+
+ const auto& expected = static_cast<const ::arrow::Int64Array&>(*expected_values);
+ ASSERT_GT(values->length(), 0);
+ ASSERT_EQ(values->length(), expected.length());
+
+ // TODO(phillipc): Is there a better way to compare these two arrays?
+ // AssertArraysEqual requires the same type, but we only care about values in this case
+ for (int i = 0; i < expected.length(); ++i) {
+ const bool value_is_valid = values->IsValid(i);
+ const bool expected_value_is_valid = expected.IsValid(i);
+
+ ASSERT_EQ(expected_value_is_valid, value_is_valid);
+
+ if (value_is_valid) {
+ uint32_t value = values->Value(i);
+ int64_t expected_value = expected.Value(i);
+ ASSERT_EQ(expected_value, static_cast<int64_t>(value));
+ }
+ }
}
using TestStringParquetIO = TestParquetIO<::arrow::StringType>;
@@ -1432,7 +1484,7 @@
offset_values.push_back(total_elements);
std::vector<int8_t> value_draws;
- randint<int8_t>(total_elements, 0, 100, &value_draws);
+ randint(total_elements, 0, 100, &value_draws);
std::vector<bool> is_valid;
random_is_valid(total_elements, 0.1, &is_valid);
@@ -1889,6 +1941,61 @@
ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
}
+class TestArrowReaderAdHocSpark
+ : public ::testing::TestWithParam<
+ std::tuple<std::string, std::shared_ptr<::arrow::DataType>>> {};
+
+TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) {
+ std::string path(std::getenv("PARQUET_TEST_DATA"));
+
+ std::string filename;
+ std::shared_ptr<::arrow::DataType> decimal_type;
+ std::tie(filename, decimal_type) = GetParam();
+
+ path += "/" + filename;
+ ASSERT_GT(path.size(), 0);
+
+ auto pool = ::arrow::default_memory_pool();
+
+ std::unique_ptr<FileReader> arrow_reader;
+ ASSERT_NO_THROW(
+ arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false))));
+ std::shared_ptr<::arrow::Table> table;
+ ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
+
+ ASSERT_EQ(1, table->num_columns());
+
+ constexpr int32_t expected_length = 24;
+
+ auto value_column = table->column(0);
+ ASSERT_EQ(expected_length, value_column->length());
+
+ auto raw_array = value_column->data();
+ ASSERT_EQ(1, raw_array->num_chunks());
+
+ auto chunk = raw_array->chunk(0);
+
+ std::shared_ptr<Array> expected_array;
+
+ ::arrow::Decimal128Builder builder(decimal_type, pool);
+
+ for (int32_t i = 0; i < expected_length; ++i) {
+ ::arrow::Decimal128 value((i + 1) * 100);
+ ASSERT_OK(builder.Append(value));
+ }
+ ASSERT_OK(builder.Finish(&expected_array));
+
+ internal::AssertArraysEqual(*expected_array, *chunk);
+}
+
+INSTANTIATE_TEST_CASE_P(
+ ReadDecimals, TestArrowReaderAdHocSpark,
+ ::testing::Values(
+ std::make_tuple("int32_decimal.parquet", ::arrow::decimal(4, 2)),
+ std::make_tuple("int64_decimal.parquet", ::arrow::decimal(10, 2)),
+ std::make_tuple("fixed_length_decimal.parquet", ::arrow::decimal(25, 2)),
+ std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal(13, 2))));
+
} // namespace arrow
} // namespace parquet
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index a7a62c5..7ed9ad8 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -51,7 +51,7 @@
const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO);
const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO);
const auto BINARY = ::arrow::binary();
-const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4);
+const auto DECIMAL_8_4 = std::make_shared<::arrow::Decimal128Type>(8, 4);
class TestConvertParquetSchema : public ::testing::Test {
public:
@@ -62,8 +62,8 @@
for (int i = 0; i < expected_schema->num_fields(); ++i) {
auto lhs = result_schema_->field(i);
auto rhs = expected_schema->field(i);
- EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
- << " != " << rhs->ToString();
+ EXPECT_TRUE(lhs->Equals(rhs))
+ << i << " " << lhs->ToString() << " != " << rhs->ToString();
}
}
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 5edc837..3ca49cb 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -29,6 +29,7 @@
#include "arrow/api.h"
#include "arrow/util/bit-util.h"
+#include "arrow/util/decimal.h"
#include "arrow/util/logging.h"
#include "arrow/util/parallel.h"
@@ -716,7 +717,8 @@
template <typename ArrowType, typename ParquetType>
using supports_fast_path =
- typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value>::type;
+ typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value,
+ ParquetType>::type;
template <typename ArrowType, typename ParquetType, typename Enable = void>
struct TransferFunctor {
@@ -868,12 +870,248 @@
// Convert from BINARY type to STRING
auto new_data = (*out)->data()->ShallowCopy();
new_data->type = type;
- RETURN_NOT_OK(::arrow::MakeArray(new_data, out));
+ *out = ::arrow::MakeArray(new_data);
}
return Status::OK();
}
};
+static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) {
+ using ::arrow::BitUtil::FromBigEndian;
+
+ const int32_t length = stop - start;
+
+ DCHECK_GE(length, 0);
+ DCHECK_LE(length, 8);
+
+ switch (length) {
+ case 0:
+ return 0;
+ case 1:
+ return bytes[start];
+ case 2:
+ return FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start));
+ case 3: {
+ const uint64_t first_two_bytes =
+ FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start));
+ const uint64_t last_byte = bytes[stop - 1];
+ return first_two_bytes << 8 | last_byte;
+ }
+ case 4:
+ return FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
+ case 5: {
+ const uint64_t first_four_bytes =
+ FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
+ const uint64_t last_byte = bytes[stop - 1];
+ return first_four_bytes << 8 | last_byte;
+ }
+ case 6: {
+ const uint64_t first_four_bytes =
+ FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
+ const uint64_t last_two_bytes =
+ FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start + 4));
+ return first_four_bytes << 16 | last_two_bytes;
+ }
+ case 7: {
+ const uint64_t first_four_bytes =
+ FromBigEndian(*reinterpret_cast<const uint32_t*>(bytes + start));
+ const uint64_t second_two_bytes =
+ FromBigEndian(*reinterpret_cast<const uint16_t*>(bytes + start + 4));
+ const uint64_t last_byte = bytes[stop - 1];
+ return first_four_bytes << 24 | second_two_bytes << 8 | last_byte;
+ }
+ case 8:
+ return FromBigEndian(*reinterpret_cast<const uint64_t*>(bytes + start));
+ default: {
+ DCHECK(false);
+ return UINT64_MAX;
+ }
+ }
+}
+
+static constexpr int32_t kMinDecimalBytes = 1;
+static constexpr int32_t kMaxDecimalBytes = 16;
+
+/// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) and one
+/// uint64_t (low bits).
+static void BytesToIntegerPair(const uint8_t* bytes,
+ const int32_t total_number_of_bytes_used, int64_t* high,
+ uint64_t* low) {
+ DCHECK_GE(total_number_of_bytes_used, kMinDecimalBytes);
+ DCHECK_LE(total_number_of_bytes_used, kMaxDecimalBytes);
+
+ /// Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the
+ /// sign bit.
+ const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
+
+ /// Sign extend the low bits if necessary
+ *low = UINT64_MAX * (is_negative && total_number_of_bytes_used < 8);
+ *high = -1 * (is_negative && total_number_of_bytes_used < kMaxDecimalBytes);
+
+ /// Stop byte of the high bytes
+ const int32_t high_bits_offset = std::max(0, total_number_of_bytes_used - 8);
+
+ /// Shift left enough bits to make room for the incoming int64_t
+ *high <<= high_bits_offset * CHAR_BIT;
+
+ /// Preserve the upper bits by inplace OR-ing the int64_t
+ *high |= BytesToInteger(bytes, 0, high_bits_offset);
+
+ /// Stop byte of the low bytes
+ const int32_t low_bits_offset = std::min(total_number_of_bytes_used, 8);
+
+ /// Shift left enough bits to make room for the incoming uint64_t
+ *low <<= low_bits_offset * CHAR_BIT;
+
+ /// Preserve the upper bits by inplace OR-ing the uint64_t
+ *low |= BytesToInteger(bytes, high_bits_offset, total_number_of_bytes_used);
+}
+
+static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
+ uint8_t* out_buf) {
+ // view the first 8 bytes as an unsigned 64-bit integer
+ auto low = reinterpret_cast<uint64_t*>(out_buf);
+
+ // view the second 8 bytes as a signed 64-bit integer
+ auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t));
+
+ // Convert the fixed size binary array bytes into a Decimal128 compatible layout
+ BytesToIntegerPair(value, byte_width, high, low);
+}
+
+/// \brief Convert an array of FixedLenByteArrays to an arrow::Decimal128Array
+/// We do this by:
+/// 1. Creating a arrow::FixedSizeBinaryArray from the RecordReader's builder
+/// 2. Allocating a buffer for the arrow::Decimal128Array
+/// 3. Converting the big-endian bytes in the FixedSizeBinaryArray to two integers
+/// representing the high and low bits of each decimal value.
+template <>
+struct TransferFunctor<::arrow::Decimal128Type, FLBAType> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+
+ // Finish the built data into a temporary array
+ std::shared_ptr<Array> array;
+ RETURN_NOT_OK(reader->builder()->Finish(&array));
+ const auto& fixed_size_binary_array =
+ static_cast<const ::arrow::FixedSizeBinaryArray&>(*array);
+
+ // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
+ // this will be different from the decimal array width because we write the minimum
+ // number of bytes necessary to represent a given precision
+ const int32_t byte_width =
+ static_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
+ .byte_width();
+
+ // The byte width of each decimal value
+ const int32_t type_length =
+ static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
+
+ // number of elements in the entire array
+ const int64_t length = fixed_size_binary_array.length();
+
+ // allocate memory for the decimal array
+ std::shared_ptr<Buffer> data;
+ RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
+
+ // raw bytes that we can write to
+ uint8_t* out_ptr = data->mutable_data();
+
+ // convert each FixedSizeBinary value to valid decimal bytes
+ const int64_t null_count = fixed_size_binary_array.null_count();
+ if (null_count > 0) {
+ for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+ if (!fixed_size_binary_array.IsNull(i)) {
+ RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width,
+ out_ptr);
+ }
+ }
+ } else {
+ for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+ RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
+ }
+ }
+
+ *out = std::make_shared<::arrow::Decimal128Array>(
+ type, length, data, fixed_size_binary_array.null_bitmap(), null_count);
+ return Status::OK();
+ }
+};
+
+/// \brief Convert an Int32 or Int64 array into a Decimal128Array
+/// The parquet spec allows systems to write decimals in int32, int64 if the values are
+/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
+/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
+template <typename ParquetIntegerType,
+ typename = typename std::enable_if<
+ std::is_same<ParquetIntegerType, Int32Type>::value ||
+ std::is_same<ParquetIntegerType, Int64Type>::value>::type>
+static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+
+ const int64_t length = reader->values_written();
+
+ using ElementType = typename ParquetIntegerType::c_type;
+ static_assert(std::is_same<ElementType, int32_t>::value ||
+ std::is_same<ElementType, int64_t>::value,
+ "ElementType must be int32_t or int64_t");
+
+ const auto values = reinterpret_cast<const ElementType*>(reader->values());
+
+ const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
+ const int64_t type_length = decimal_type.byte_width();
+
+ std::shared_ptr<Buffer> data;
+ RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
+ uint8_t* out_ptr = data->mutable_data();
+
+ using ::arrow::BitUtil::FromLittleEndian;
+
+ for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+ // sign/zero extend int32_t values, otherwise a no-op
+ const auto value = static_cast<int64_t>(values[i]);
+
+ auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
+
+ // No-op on little endian machines, byteswap on big endian
+ out_ptr_view[0] = FromLittleEndian(static_cast<uint64_t>(value));
+
+ // no need to byteswap here because we're sign/zero extending exactly 8 bytes
+ out_ptr_view[1] = static_cast<uint64_t>(value < 0 ? -1 : 0);
+ }
+
+ if (reader->nullable_values()) {
+ std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ *out = std::make_shared<::arrow::Decimal128Array>(type, length, data, is_valid,
+ reader->null_count());
+ } else {
+ *out = std::make_shared<::arrow::Decimal128Array>(type, length, data);
+ }
+ return Status::OK();
+}
+
+template <>
+struct TransferFunctor<::arrow::Decimal128Type, Int32Type> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ return DecimalIntegerTransfer<Int32Type>(reader, pool, type, out);
+ }
+};
+
+template <>
+struct TransferFunctor<::arrow::Decimal128Type, Int64Type> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ return DecimalIntegerTransfer<Int64Type>(reader, pool, type, out);
+ }
+};
+
#define TRANSFER_DATA(ArrowType, ParquetType) \
TransferFunctor<ArrowType, ParquetType> func; \
RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), out)); \
@@ -932,6 +1170,22 @@
TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
+ case ::arrow::Type::DECIMAL: {
+ switch (descr_->physical_type()) {
+ case ::parquet::Type::INT32: {
+ TRANSFER_DATA(::arrow::Decimal128Type, Int32Type);
+ } break;
+ case ::parquet::Type::INT64: {
+ TRANSFER_DATA(::arrow::Decimal128Type, Int64Type);
+ } break;
+ case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
+ TRANSFER_DATA(::arrow::Decimal128Type, FLBAType);
+ } break;
+ default:
+ return Status::Invalid(
+ "Physical type for decimal must be int32, int64, or fixed length binary");
+ }
+ } break;
case ::arrow::Type::TIMESTAMP: {
::arrow::TimestampType* timestamp_type =
static_cast<::arrow::TimestampType*>(field_->type().get());
@@ -946,8 +1200,7 @@
default:
return Status::NotImplemented("TimeUnit not supported");
}
- break;
- }
+ } break;
TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type)
TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type)
default:
diff --git a/src/parquet/arrow/record_reader.cc b/src/parquet/arrow/record_reader.cc
index 7275d2f..6405ee7 100644
--- a/src/parquet/arrow/record_reader.cc
+++ b/src/parquet/arrow/record_reader.cc
@@ -83,7 +83,7 @@
Reset();
}
- virtual ~RecordReaderImpl() {}
+ virtual ~RecordReaderImpl() = default;
virtual int64_t ReadRecords(int64_t num_records) = 0;
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index e16a1af..41da9fb 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -25,6 +25,7 @@
#include "parquet/util/schema-util.h"
#include "arrow/api.h"
+#include "arrow/util/logging.h"
using arrow::Field;
using arrow::Status;
@@ -49,10 +50,9 @@
const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO);
const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
-TypePtr MakeDecimalType(const PrimitiveNode& node) {
- int precision = node.decimal_metadata().precision;
- int scale = node.decimal_metadata().scale;
- return std::make_shared<::arrow::DecimalType>(precision, scale);
+TypePtr MakeDecimal128Type(const PrimitiveNode& node) {
+ const auto& metadata = node.decimal_metadata();
+ return ::arrow::decimal(metadata.precision, metadata.scale);
}
static Status FromByteArray(const PrimitiveNode& node, TypePtr* out) {
@@ -61,7 +61,7 @@
*out = ::arrow::utf8();
break;
case LogicalType::DECIMAL:
- *out = MakeDecimalType(node);
+ *out = MakeDecimal128Type(node);
break;
default:
// BINARY
@@ -77,7 +77,7 @@
*out = ::arrow::fixed_size_binary(node.type_length());
break;
case LogicalType::DECIMAL:
- *out = MakeDecimalType(node);
+ *out = MakeDecimal128Type(node);
break;
default:
std::stringstream ss;
@@ -120,7 +120,7 @@
*out = ::arrow::time32(::arrow::TimeUnit::MILLI);
break;
case LogicalType::DECIMAL:
- *out = MakeDecimalType(node);
+ *out = MakeDecimal128Type(node);
break;
default:
std::stringstream ss;
@@ -144,7 +144,7 @@
*out = ::arrow::uint64();
break;
case LogicalType::DECIMAL:
- *out = MakeDecimalType(node);
+ *out = MakeDecimal128Type(node);
break;
case LogicalType::TIMESTAMP_MILLIS:
*out = TIMESTAMP_MS;
@@ -473,7 +473,10 @@
ParquetType::type type;
Repetition::type repetition =
field->nullable() ? Repetition::OPTIONAL : Repetition::REQUIRED;
+
int length = -1;
+ int precision = -1;
+ int scale = -1;
switch (field->type()->id()) {
case ArrowType::NA:
@@ -532,9 +535,18 @@
break;
case ArrowType::FIXED_SIZE_BINARY: {
type = ParquetType::FIXED_LEN_BYTE_ARRAY;
- auto fixed_size_binary_type =
- static_cast<::arrow::FixedSizeBinaryType*>(field->type().get());
- length = fixed_size_binary_type->byte_width();
+ const auto& fixed_size_binary_type =
+ static_cast<const ::arrow::FixedSizeBinaryType&>(*field->type());
+ length = fixed_size_binary_type.byte_width();
+ } break;
+ case ArrowType::DECIMAL: {
+ type = ParquetType::FIXED_LEN_BYTE_ARRAY;
+ logical_type = LogicalType::DECIMAL;
+ const auto& decimal_type =
+ static_cast<const ::arrow::Decimal128Type&>(*field->type());
+ precision = decimal_type.precision();
+ scale = decimal_type.scale();
+ length = DecimalSize(precision);
} break;
case ArrowType::DATE32:
type = ParquetType::INT32;
@@ -565,12 +577,12 @@
auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type());
return StructToNode(struct_type, field->name(), field->nullable(), properties,
arrow_properties, out);
- } break;
+ }
case ArrowType::LIST: {
auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type());
return ListToNode(list_type, field->name(), field->nullable(), properties,
arrow_properties, out);
- } break;
+ }
case ArrowType::DICTIONARY: {
// Parquet has no Dictionary type, dictionary-encoded is handled on
// the encoding, not the schema level.
@@ -582,14 +594,15 @@
return FieldToNode(unpacked_field, properties, arrow_properties, out);
}
default: {
- // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
+ // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR
std::stringstream ss;
ss << "Unhandled type for Arrow to Parquet schema conversion: ";
ss << field->type()->ToString();
return Status::NotImplemented(ss.str());
}
}
- *out = PrimitiveNode::Make(field->name(), repetition, type, logical_type, length);
+ *out = PrimitiveNode::Make(field->name(), repetition, type, logical_type, length,
+ precision, scale);
return Status::OK();
}
@@ -617,5 +630,77 @@
out);
}
+/// \brief Compute the number of bytes required to represent a decimal of a
+/// given precision. Taken from the Apache Impala codebase. The comments next
+/// to the return values are the maximum value that can be represented in 2's
+/// complement with the returned number of bytes.
+int32_t DecimalSize(int32_t precision) {
+ DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got "
+ << precision;
+ DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got "
+ << precision;
+
+ switch (precision) {
+ case 1:
+ case 2:
+ return 1; // 127
+ case 3:
+ case 4:
+ return 2; // 32,767
+ case 5:
+ case 6:
+ return 3; // 8,388,607
+ case 7:
+ case 8:
+ case 9:
+ return 4; // 2,147,483,427
+ case 10:
+ case 11:
+ return 5; // 549,755,813,887
+ case 12:
+ case 13:
+ case 14:
+ return 6; // 140,737,488,355,327
+ case 15:
+ case 16:
+ return 7; // 36,028,797,018,963,967
+ case 17:
+ case 18:
+ return 8; // 9,223,372,036,854,775,807
+ case 19:
+ case 20:
+ case 21:
+ return 9; // 2,361,183,241,434,822,606,847
+ case 22:
+ case 23:
+ return 10; // 604,462,909,807,314,587,353,087
+ case 24:
+ case 25:
+ case 26:
+ return 11; // 154,742,504,910,672,534,362,390,527
+ case 27:
+ case 28:
+ return 12; // 39,614,081,257,132,168,796,771,975,167
+ case 29:
+ case 30:
+ case 31:
+ return 13; // 10,141,204,801,825,835,211,973,625,643,007
+ case 32:
+ case 33:
+ return 14; // 2,596,148,429,267,413,814,265,248,164,610,047
+ case 34:
+ case 35:
+ return 15; // 664,613,997,892,457,936,451,903,530,140,172,287
+ case 36:
+ case 37:
+ case 38:
+ return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727
+ default:
+ DCHECK(false);
+ break;
+ }
+ return -1;
+}
+
} // namespace arrow
} // namespace parquet
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
index de153eb..3b212da 100644
--- a/src/parquet/arrow/schema.h
+++ b/src/parquet/arrow/schema.h
@@ -85,6 +85,8 @@
const WriterProperties& properties,
std::shared_ptr<SchemaDescriptor>* out);
+int32_t DecimalSize(int32_t precision);
+
} // namespace arrow
} // namespace parquet
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 954a84f..8611a30 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+#include <limits>
#include <string>
#include <vector>
#include "arrow/api.h"
#include "arrow/test-util.h"
#include "arrow/type_traits.h"
+#include "arrow/util/decimal.h"
namespace parquet {
namespace arrow {
@@ -28,6 +30,16 @@
using ::arrow::Array;
using ::arrow::Status;
+template <int32_t PRECISION>
+struct DecimalWithPrecisionAndScale {
+ static_assert(PRECISION >= 1 && PRECISION <= 38, "Invalid precision value");
+
+ using type = ::arrow::Decimal128Type;
+ static constexpr ::arrow::Type::type type_id = ::arrow::Decimal128Type::type_id;
+ static constexpr int32_t precision = PRECISION;
+ static constexpr int32_t scale = PRECISION - 1;
+};
+
template <typename ArrowType>
using is_arrow_float = std::is_floating_point<typename ArrowType::c_type>;
@@ -52,8 +64,10 @@
template <class ArrowType>
typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullArray(
size_t size, std::shared_ptr<Array>* out) {
- std::vector<typename ArrowType::c_type> values;
- ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values);
+ using c_type = typename ArrowType::c_type;
+ std::vector<c_type> values;
+ ::arrow::test::random_real(size, 0, static_cast<c_type>(0), static_cast<c_type>(1),
+ &values);
::arrow::NumericBuilder<ArrowType> builder;
RETURN_NOT_OK(builder.Append(values.data(), values.size()));
return builder.Finish(out);
@@ -64,7 +78,7 @@
is_arrow_int<ArrowType>::value && !is_arrow_date<ArrowType>::value, Status>::type
NonNullArray(size_t size, std::shared_ptr<Array>* out) {
std::vector<typename ArrowType::c_type> values;
- ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+ ::arrow::test::randint(size, 0, 64, &values);
// Passing data type so this will work with TimestampType too
::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
@@ -77,7 +91,7 @@
typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NonNullArray(
size_t size, std::shared_ptr<Array>* out) {
std::vector<typename ArrowType::c_type> values;
- ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+ ::arrow::test::randint(size, 0, 64, &values);
for (size_t i = 0; i < size; i++) {
values[i] *= 86400000;
}
@@ -114,11 +128,54 @@
return builder.Finish(out);
}
+static inline void random_decimals(int64_t n, uint32_t seed, int32_t precision,
+ uint8_t* out) {
+ std::mt19937 gen(seed);
+ std::uniform_int_distribution<uint32_t> d(0, std::numeric_limits<uint8_t>::max());
+ const int32_t required_bytes = DecimalSize(precision);
+ constexpr int32_t byte_width = 16;
+ std::fill(out, out + byte_width * n, '\0');
+
+ for (int64_t i = 0; i < n; ++i, out += byte_width) {
+ std::generate(out, out + required_bytes,
+ [&d, &gen] { return static_cast<uint8_t>(d(gen)); });
+
+ // sign extend if the sign bit is set for the last byte generated
+ // 0b10000000 == 0x80 == 128
+ if ((out[required_bytes - 1] & '\x80') != 0) {
+ std::fill(out + required_bytes, out + byte_width, '\xFF');
+ }
+ }
+}
+
+template <typename ArrowType, int32_t precision = ArrowType::precision>
+typename std::enable_if<
+ std::is_same<ArrowType, DecimalWithPrecisionAndScale<precision>>::value, Status>::type
+NonNullArray(size_t size, std::shared_ptr<Array>* out) {
+ constexpr int32_t kDecimalPrecision = precision;
+ constexpr int32_t kDecimalScale = DecimalWithPrecisionAndScale<precision>::scale;
+
+ const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale);
+ ::arrow::Decimal128Builder builder(type);
+ const int32_t byte_width =
+ static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
+
+ constexpr int32_t seed = 0;
+
+ std::shared_ptr<Buffer> out_buf;
+ RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * byte_width,
+ &out_buf));
+ random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
+
+ RETURN_NOT_OK(builder.Append(out_buf->data(), size));
+ return builder.Finish(out);
+}
+
template <class ArrowType>
typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullArray(
size_t size, std::shared_ptr<Array>* out) {
std::vector<uint8_t> values;
- ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+ ::arrow::test::randint(size, 0, 1, &values);
::arrow::BooleanBuilder builder;
RETURN_NOT_OK(builder.Append(values.data(), values.size()));
return builder.Finish(out);
@@ -128,9 +185,10 @@
template <typename ArrowType>
typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NullableArray(
size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
- std::vector<typename ArrowType::c_type> values;
- ::arrow::test::random_real<typename ArrowType::c_type>(size, seed, -1e10, 1e10,
- &values);
+ using c_type = typename ArrowType::c_type;
+ std::vector<c_type> values;
+ ::arrow::test::random_real(size, seed, static_cast<c_type>(-1e10),
+ static_cast<c_type>(1e10), &values);
std::vector<uint8_t> valid_bytes(size, 1);
for (size_t i = 0; i < num_nulls; i++) {
@@ -151,7 +209,7 @@
// Seed is random in Arrow right now
(void)seed;
- ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+ ::arrow::test::randint(size, 0, 64, &values);
std::vector<uint8_t> valid_bytes(size, 1);
for (size_t i = 0; i < num_nulls; i++) {
@@ -172,7 +230,7 @@
// Seed is random in Arrow right now
(void)seed;
- ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+ ::arrow::test::randint(size, 0, 64, &values);
for (size_t i = 0; i < size; i++) {
values[i] *= 86400000;
}
@@ -246,6 +304,34 @@
return builder.Finish(out);
}
+template <typename ArrowType, int32_t precision = ArrowType::precision>
+typename std::enable_if<
+ std::is_same<ArrowType, DecimalWithPrecisionAndScale<precision>>::value, Status>::type
+NullableArray(size_t size, size_t num_nulls, uint32_t seed,
+ std::shared_ptr<::arrow::Array>* out) {
+ std::vector<uint8_t> valid_bytes(size, '\1');
+
+ for (size_t i = 0; i < num_nulls; ++i) {
+ valid_bytes[i * 2] = '\0';
+ }
+
+ constexpr int32_t kDecimalPrecision = precision;
+ constexpr int32_t kDecimalScale = DecimalWithPrecisionAndScale<precision>::scale;
+ const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale);
+ const int32_t byte_width =
+ static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
+
+ std::shared_ptr<::arrow::Buffer> out_buf;
+ RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * byte_width,
+ &out_buf));
+
+ random_decimals(size, seed, precision, out_buf->mutable_data());
+
+ ::arrow::Decimal128Builder builder(type);
+ RETURN_NOT_OK(builder.Append(out_buf->data(), size, valid_bytes.data()));
+ return builder.Finish(out);
+}
+
// This helper function only supports (size/2) nulls yet.
template <class ArrowType>
typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray(
@@ -255,7 +341,7 @@
// Seed is random in Arrow right now
(void)seed;
- ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+ ::arrow::test::randint(size, 0, 1, &values);
std::vector<uint8_t> valid_bytes(size, 1);
for (size_t i = 0; i < num_nulls; i++) {
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index b53c1ca..1f3fc7e 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -32,6 +32,7 @@
using arrow::Array;
using arrow::BinaryArray;
using arrow::FixedSizeBinaryArray;
+using arrow::Decimal128Array;
using arrow::BooleanArray;
using arrow::Int16Array;
using arrow::Int16Builder;
@@ -104,7 +105,6 @@
NOT_IMPLEMENTED_VISIT(Struct)
NOT_IMPLEMENTED_VISIT(Union)
- NOT_IMPLEMENTED_VISIT(Decimal)
NOT_IMPLEMENTED_VISIT(Dictionary)
NOT_IMPLEMENTED_VISIT(Interval)
@@ -743,8 +743,6 @@
buffer_ptr[i] =
ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]);
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
} else {
int buffer_idx = 0;
for (int64_t i = 0; i < data->length(); i++) {
@@ -753,9 +751,9 @@
ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]);
}
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
}
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
@@ -765,29 +763,82 @@
ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels) {
RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(FLBA), false));
- auto data = static_cast<const FixedSizeBinaryArray*>(array.get());
+ const auto& data = static_cast<const FixedSizeBinaryArray&>(*array);
+ const int64_t length = data.length();
+
auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
- if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
+ if (writer->descr()->schema_node()->is_required() || data.null_count() == 0) {
// no nulls, just dump the data
// todo(advancedxy): use a writeBatch to avoid this step
- for (int64_t i = 0; i < data->length(); i++) {
- buffer_ptr[i] = FixedLenByteArray(data->GetValue(i));
+ for (int64_t i = 0; i < length; i++) {
+ buffer_ptr[i] = FixedLenByteArray(data.GetValue(i));
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
} else {
int buffer_idx = 0;
- for (int64_t i = 0; i < data->length(); i++) {
- if (!data->IsNull(i)) {
- buffer_ptr[buffer_idx++] = FixedLenByteArray(data->GetValue(i));
+ for (int64_t i = 0; i < length; i++) {
+ if (!data.IsNull(i)) {
+ buffer_ptr[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
}
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
}
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+ PARQUET_CATCH_NOT_OK(writer->Close());
+ return Status::OK();
+}
+
+template <>
+Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
+ ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels) {
+ const auto& data = static_cast<const Decimal128Array&>(*array);
+ const int64_t length = data.length();
+
+ // TODO(phillipc): This is potentially very wasteful if we have a lot of nulls
+ std::vector<uint64_t> big_endian_values(static_cast<size_t>(length) * 2);
+
+ RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(FLBA), false));
+ auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
+
+ auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
+
+ const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*data.type());
+ const int32_t offset =
+ decimal_type.byte_width() - DecimalSize(decimal_type.precision());
+
+ const bool does_not_have_nulls =
+ writer->descr()->schema_node()->is_required() || data.null_count() == 0;
+
+ // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we
+ // don't have to keep writing two loops to handle the case where we know there are no
+ // nulls
+ if (does_not_have_nulls) {
+ // no nulls, just dump the data
+ // todo(advancedxy): use a writeBatch to avoid this step
+ for (int64_t i = 0, j = 0; i < length; ++i, j += 2) {
+ auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
+ big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
+ big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
+ buffer_ptr[i] = FixedLenByteArray(
+ reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
+ }
+ } else {
+ for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) {
+ if (!data.IsNull(i)) {
+ auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
+ big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
+ big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
+ buffer_ptr[buffer_idx++] = FixedLenByteArray(
+ reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
+ j += 2;
+ }
+ }
+ }
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
@@ -896,6 +947,7 @@
WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
+ WRITE_BATCH_CASE(DECIMAL, Decimal128Type, FLBAType)
WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type)
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index be38752..3284aca 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -420,11 +420,9 @@
PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false));
uint8_t* bytes_data = byte_array_data_->mutable_data();
- int offset = 0;
- for (int i = 0; i < num_dictionary_values; ++i) {
+ for (int32_t i = 0, offset = 0; i < num_dictionary_values; ++i, offset += fixed_len) {
memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len);
dictionary_[i].ptr = bytes_data + offset;
- offset += fixed_len;
}
}
@@ -597,7 +595,7 @@
template <>
inline int DictEncoder<ByteArrayType>::Hash(const ByteArray& value) const {
if (value.len > 0) {
- DCHECK(nullptr != value.ptr) << "Value ptr cannot be NULL";
+ DCHECK_NE(nullptr, value.ptr) << "Value ptr cannot be NULL";
}
return HashUtil::Hash(value.ptr, value.len, 0);
}
@@ -605,7 +603,7 @@
template <>
inline int DictEncoder<FLBAType>::Hash(const FixedLenByteArray& value) const {
if (type_length_ > 0) {
- DCHECK(nullptr != value.ptr) << "Value ptr cannot be NULL";
+ DCHECK_NE(nullptr, value.ptr) << "Value ptr cannot be NULL";
}
return HashUtil::Hash(value.ptr, type_length_, 0);
}
@@ -923,7 +921,8 @@
::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
: Decoder<ByteArrayType>(descr, Encoding::DELTA_BYTE_ARRAY),
prefix_len_decoder_(nullptr, pool),
- suffix_decoder_(nullptr, pool) {}
+ suffix_decoder_(nullptr, pool),
+ last_value_(0, nullptr) {}
virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 9b9bde9..4ec48a4 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -45,9 +45,9 @@
: contents_(std::move(contents)) {}
std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
- DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
- << metadata()->num_columns()
- << "columns, requested column: " << i;
+ DCHECK(i < metadata()->num_columns())
+ << "The RowGroup only has " << metadata()->num_columns()
+ << "columns, requested column: " << i;
const ColumnDescriptor* descr = metadata()->schema()->Column(i);
std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
@@ -57,9 +57,9 @@
}
std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
- DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
- << metadata()->num_columns()
- << "columns, requested column: " << i;
+ DCHECK(i < metadata()->num_columns())
+ << "The RowGroup only has " << metadata()->num_columns()
+ << "columns, requested column: " << i;
return contents_->GetColumnPageReader(i);
}
@@ -127,9 +127,9 @@
}
std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
- DCHECK(i < metadata()->num_row_groups()) << "The file only has "
- << metadata()->num_row_groups()
- << "row groups, requested reader for: " << i;
+ DCHECK(i < metadata()->num_row_groups())
+ << "The file only has " << metadata()->num_row_groups()
+ << "row groups, requested reader for: " << i;
return contents_->GetRowGroup(i);
}
diff --git a/src/parquet/types.h b/src/parquet/types.h
index af3a58f..53b33d5 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -21,6 +21,7 @@
#include <algorithm>
#include <cstdint>
#include <cstring>
+#include <iterator>
#include <sstream>
#include <string>
@@ -136,79 +137,63 @@
ByteArray(uint32_t len, const uint8_t* ptr) : len(len), ptr(ptr) {}
uint32_t len;
const uint8_t* ptr;
-
- bool operator==(const ByteArray& other) const {
- return this->len == other.len && 0 == memcmp(this->ptr, other.ptr, this->len);
- }
-
- bool operator!=(const ByteArray& other) const {
- return this->len != other.len || 0 != memcmp(this->ptr, other.ptr, this->len);
- }
};
+inline bool operator==(const ByteArray& left, const ByteArray& right) {
+ return left.len == right.len && std::equal(left.ptr, left.ptr + left.len, right.ptr);
+}
+
+inline bool operator!=(const ByteArray& left, const ByteArray& right) {
+ return !(left == right);
+}
+
struct FixedLenByteArray {
FixedLenByteArray() : ptr(nullptr) {}
explicit FixedLenByteArray(const uint8_t* ptr) : ptr(ptr) {}
const uint8_t* ptr;
};
-typedef FixedLenByteArray FLBA;
+using FLBA = FixedLenByteArray;
-MANUALLY_ALIGNED_STRUCT(1) Int96 {
- uint32_t value[3];
-
- bool operator==(const Int96& other) const {
- return 0 == memcmp(this->value, other.value, 3 * sizeof(uint32_t));
- }
-
- bool operator!=(const Int96& other) const { return !(*this == other); }
-};
+MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; };
STRUCT_END(Int96, 12);
+inline bool operator==(const Int96& left, const Int96& right) {
+ return std::equal(left.value, left.value + 3, right.value);
+}
+
+inline bool operator!=(const Int96& left, const Int96& right) { return !(left == right); }
+
static inline std::string ByteArrayToString(const ByteArray& a) {
return std::string(reinterpret_cast<const char*>(a.ptr), a.len);
}
static inline std::string Int96ToString(const Int96& a) {
- std::stringstream result;
- for (int i = 0; i < 3; i++) {
- result << a.value[i] << " ";
- }
+ std::ostringstream result;
+ std::copy(a.value, a.value + 3, std::ostream_iterator<uint32_t>(result, " "));
return result.str();
}
static inline std::string FixedLenByteArrayToString(const FixedLenByteArray& a, int len) {
- const uint8_t* bytes = reinterpret_cast<const uint8_t*>(a.ptr);
- std::stringstream result;
- for (int i = 0; i < len; i++) {
- result << (uint32_t)bytes[i] << " ";
- }
+ std::ostringstream result;
+ std::copy(a.ptr, a.ptr + len, std::ostream_iterator<uint32_t>(result, " "));
return result.str();
}
-static inline int ByteCompare(const ByteArray& x1, const ByteArray& x2) {
- uint32_t len = std::min(x1.len, x2.len);
- int cmp = memcmp(x1.ptr, x2.ptr, len);
- if (cmp != 0) return cmp;
- if (len < x1.len) return 1;
- if (len < x2.len) return -1;
- return 0;
-}
-
-template <int TYPE>
+template <Type::type TYPE>
struct type_traits {};
template <>
struct type_traits<Type::BOOLEAN> {
- typedef bool value_type;
- static constexpr int value_byte_size = 1;
+ using value_type = bool;
+ static constexpr int value_byte_size = 1;
static constexpr const char* printf_code = "d";
};
template <>
struct type_traits<Type::INT32> {
- typedef int32_t value_type;
+ using value_type = int32_t;
static constexpr int value_byte_size = 4;
static constexpr const char* printf_code = "d";
@@ -216,7 +201,7 @@
template <>
struct type_traits<Type::INT64> {
- typedef int64_t value_type;
+ using value_type = int64_t;
static constexpr int value_byte_size = 8;
static constexpr const char* printf_code = "ld";
@@ -224,7 +209,7 @@
template <>
struct type_traits<Type::INT96> {
- typedef Int96 value_type;
+ using value_type = Int96;
static constexpr int value_byte_size = 12;
static constexpr const char* printf_code = "s";
@@ -232,7 +217,7 @@
template <>
struct type_traits<Type::FLOAT> {
- typedef float value_type;
+ using value_type = float;
static constexpr int value_byte_size = 4;
static constexpr const char* printf_code = "f";
@@ -240,7 +225,7 @@
template <>
struct type_traits<Type::DOUBLE> {
- typedef double value_type;
+ using value_type = double;
static constexpr int value_byte_size = 8;
static constexpr const char* printf_code = "lf";
@@ -248,7 +233,7 @@
template <>
struct type_traits<Type::BYTE_ARRAY> {
- typedef ByteArray value_type;
+ using value_type = ByteArray;
static constexpr int value_byte_size = sizeof(ByteArray);
static constexpr const char* printf_code = "s";
@@ -256,7 +241,7 @@
template <>
struct type_traits<Type::FIXED_LEN_BYTE_ARRAY> {
- typedef FixedLenByteArray value_type;
+ using value_type = FixedLenByteArray;
static constexpr int value_byte_size = sizeof(FixedLenByteArray);
static constexpr const char* printf_code = "s";
@@ -264,18 +249,18 @@
template <Type::type TYPE>
struct DataType {
+ using c_type = typename type_traits<TYPE>::value_type;
static constexpr Type::type type_num = TYPE;
- typedef typename type_traits<TYPE>::value_type c_type;
};
-typedef DataType<Type::BOOLEAN> BooleanType;
-typedef DataType<Type::INT32> Int32Type;
-typedef DataType<Type::INT64> Int64Type;
-typedef DataType<Type::INT96> Int96Type;
-typedef DataType<Type::FLOAT> FloatType;
-typedef DataType<Type::DOUBLE> DoubleType;
-typedef DataType<Type::BYTE_ARRAY> ByteArrayType;
-typedef DataType<Type::FIXED_LEN_BYTE_ARRAY> FLBAType;
+using BooleanType = DataType<Type::BOOLEAN>;
+using Int32Type = DataType<Type::INT32>;
+using Int64Type = DataType<Type::INT64>;
+using Int96Type = DataType<Type::INT96>;
+using FloatType = DataType<Type::FLOAT>;
+using DoubleType = DataType<Type::DOUBLE>;
+using ByteArrayType = DataType<Type::BYTE_ARRAY>;
+using FLBAType = DataType<Type::FIXED_LEN_BYTE_ARRAY>;
template <typename Type>
inline std::string format_fwf(int width) {