| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "parquet/chunker_internal.h" |
| |
| #include <cstdint> |
| #include <iterator> |
| #include <string> |
| #include <vector> |
| |
| #include "arrow/array.h" |
| #include "arrow/util/bit_util.h" |
| #include "arrow/util/logging_internal.h" |
| #include "arrow/visit_type_inline.h" |
| #include "parquet/chunker_internal_generated.h" |
| #include "parquet/exception.h" |
| #include "parquet/level_conversion.h" |
| |
| namespace parquet::internal { |
| |
| using ::arrow::internal::checked_cast; |
| |
| static_assert(std::size(kGearhashTable) == kNumGearhashTables, |
| "should update CDC code to reflect number of generated hash tables"); |
| static_assert(sizeof(kGearhashTable) == kNumGearhashTables * 256 * 8, |
| "each table should have 256 entries of 64 bit values"); |
| |
| namespace { |
| |
| /// Calculate the mask to use for the rolling hash, the mask is used to determine if a |
| /// new chunk should be created based on the rolling hash value. The mask is calculated |
| /// based on the min_chunk_size, max_chunk_size and norm_level parameters. |
| /// |
| /// Assuming that the gear hash hash random values with a uniform distribution, then each |
| /// bit in the actual value of rolling_hash_ has even probability of being set so a mask |
| /// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This |
| /// is the judgment criteria for the original gear hash based content-defined chunking. |
| /// The main drawback of this approach is the non-uniform distribution of the chunk sizes. |
| /// |
| /// Later on the FastCDC has improved the process by introducing: |
| /// - sub-minimum chunk cut-point skipping (not hashing the first `min_chunk_size` bytes) |
| /// - chunk size normalization (using two masks) |
| /// |
| /// This implementation uses cut-point skipping because it improves the overall |
| /// performance and a more accurate alternative to have less skewed chunk size |
| /// distribution. Instead of using two different masks (one with a lower and one with a |
| /// higher probability of matching and switching them based on the actual chunk size), we |
| /// rather use 8 different gear hash tables and require having 8 consecutive matches while |
| /// switching between the used hashtables. This approach is based on central limit theorem |
| /// and approximates normal distribution of the chunk sizes. |
| // |
| // @param min_chunk_size The minimum chunk size (default 256KiB) |
| // @param max_chunk_size The maximum chunk size (default 1MiB) |
| // @param norm_level Normalization level (default 0) |
| // @return The mask used to compare against the rolling hash |
| uint64_t CalculateMask(int64_t min_chunk_size, int64_t max_chunk_size, int norm_level) { |
| if (min_chunk_size < 0) { |
| throw ParquetException("min_chunk_size must be non-negative"); |
| } |
| if (max_chunk_size <= min_chunk_size) { |
| throw ParquetException("max_chunk_size must be greater than min_chunk_size"); |
| } |
| |
| // calculate the average size of the chunks |
| int64_t avg_chunk_size = (min_chunk_size + max_chunk_size) / 2; |
| // since we are skipping the first `min_chunk_size` bytes for each chunk, we need to |
| // target a smaller chunk size to reach the average size after skipping the first |
| // `min_chunk_size` bytes; also divide by the number of gearhash tables to have a |
| // a more gaussian-like distribution |
| int64_t target_size = (avg_chunk_size - min_chunk_size) / kNumGearhashTables; |
| |
| // assuming that the gear hash has a uniform distribution, we can calculate the mask |
| // by taking the floor(log2(target_size)) |
| int mask_bits = std::max(0, ::arrow::bit_util::NumRequiredBits(target_size) - 1); |
| |
| // a user defined `norm_level` can be used to adjust the mask size, hence the matching |
| // probability, by increasing the norm_level we increase the probability of matching |
| // the mask, forcing the distribution closer to the average size; norm_level is 0 by |
| // default |
| int effective_bits = mask_bits - norm_level; |
| |
| if (effective_bits < 1 || effective_bits > 63) { |
| throw ParquetException( |
| "The number of bits in the CDC mask must be between 1 and 63, got " + |
| std::to_string(effective_bits)); |
| } else { |
| // create the mask by setting the top bits |
| return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); |
| } |
| } |
| |
| } // namespace |
| |
| class ContentDefinedChunker::Impl { |
| public: |
| Impl(const LevelInfo& level_info, int64_t min_chunk_size, int64_t max_chunk_size, |
| int norm_level) |
| : level_info_(level_info), |
| min_chunk_size_(min_chunk_size), |
| max_chunk_size_(max_chunk_size), |
| rolling_hash_mask_(CalculateMask(min_chunk_size, max_chunk_size, norm_level)) {} |
| |
| uint64_t GetRollingHashMask() const { return rolling_hash_mask_; } |
| |
| void Roll(bool value) { |
| // Update the rolling hash with a boolean value, set has_matched_ to true if the hash |
| // matches the |
| if (++chunk_size_ < min_chunk_size_) { |
| // short-circuit if we haven't reached the minimum chunk size, this speeds up the |
| // chunking process since the gearhash doesn't need to be updated |
| return; |
| } |
| rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value]; |
| has_matched_ = has_matched_ || ((rolling_hash_ & rolling_hash_mask_) == 0); |
| } |
| |
| template <int kByteWidth> |
| void Roll(const uint8_t* value) { |
| // Update the rolling hash with a compile-time known sized value, set has_matched_ to |
| // true if the hash matches the mask. |
| chunk_size_ += kByteWidth; |
| if (chunk_size_ < min_chunk_size_) { |
| // short-circuit if we haven't reached the minimum chunk size, this speeds up the |
| // chunking process since the gearhash doesn't need to be updated |
| return; |
| } |
| for (size_t i = 0; i < kByteWidth; ++i) { |
| rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; |
| has_matched_ = has_matched_ || ((rolling_hash_ & rolling_hash_mask_) == 0); |
| } |
| } |
| |
| template <typename T> |
| void Roll(const T* value) { |
| return Roll<sizeof(T)>(reinterpret_cast<const uint8_t*>(value)); |
| } |
| |
| void Roll(const uint8_t* value, int64_t length) { |
| // Update the rolling hash with a binary-like value, set has_matched_ to true if the |
| // hash matches the mask. |
| chunk_size_ += length; |
| if (chunk_size_ < min_chunk_size_) { |
| // short-circuit if we haven't reached the minimum chunk size, this speeds up the |
| // chunking process since the gearhash doesn't need to be updated |
| return; |
| } |
| for (auto i = 0; i < length; ++i) { |
| rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; |
| has_matched_ = has_matched_ || ((rolling_hash_ & rolling_hash_mask_) == 0); |
| } |
| } |
| |
| bool NeedNewChunk() { |
| // decide whether to create a new chunk based on the rolling hash; has_matched_ is |
| // set to true if we encountered a match since the last NeedNewChunk() call |
| if (ARROW_PREDICT_FALSE(has_matched_)) { |
| has_matched_ = false; |
| // in order to have a normal distribution of chunk sizes, we only create a new chunk |
| // if the adjused mask matches the rolling hash 8 times in a row, each run uses a |
| // different gearhash table (gearhash's chunk size has geometric distribution, and |
| // we use central limit theorem to approximate normal distribution, see |
| // section 6.2.1 in paper https://www.cidrdb.org/cidr2023/papers/p43-low.pdf) |
| if (ARROW_PREDICT_FALSE(++nth_run_ >= kNumGearhashTables)) { |
| // note that we choose not to reset the rolling hash state here, nor anywhere else |
| // in the code, in practice this doesn't seem to affect the chunking effectiveness |
| nth_run_ = 0; |
| chunk_size_ = 0; |
| return true; |
| } |
| } |
| if (ARROW_PREDICT_FALSE(chunk_size_ >= max_chunk_size_)) { |
| // we have a hard limit on the maximum chunk size, note that we don't reset the |
| // rolling hash state here, so the next NeedNewChunk() call will continue from the |
| // current state |
| chunk_size_ = 0; |
| return true; |
| } |
| return false; |
| } |
| |
| void ValidateChunks(const std::vector<Chunk>& chunks, int64_t num_levels) const { |
| // chunks must be non-empty and monotonic increasing |
| ARROW_DCHECK(!chunks.empty()); |
| |
| // the first chunk must start at the first level |
| auto first_chunk = chunks.front(); |
| ARROW_DCHECK_EQ(first_chunk.level_offset, 0); |
| ARROW_DCHECK_EQ(first_chunk.value_offset, 0); |
| |
| // the following chunks must be contiguous, non-overlapping and monotonically |
| // increasing |
| auto sum_levels = first_chunk.levels_to_write; |
| for (size_t i = 1; i < chunks.size(); ++i) { |
| auto chunk = chunks[i]; |
| auto prev_chunk = chunks[i - 1]; |
| ARROW_DCHECK_GT(chunk.levels_to_write, 0); |
| ARROW_DCHECK_GE(chunk.value_offset, prev_chunk.value_offset); |
| ARROW_DCHECK_EQ(chunk.level_offset, |
| prev_chunk.level_offset + prev_chunk.levels_to_write); |
| sum_levels += chunk.levels_to_write; |
| } |
| ARROW_DCHECK_EQ(sum_levels, num_levels); |
| |
| // the last chunk must end at the last level |
| auto last_chunk = chunks.back(); |
| ARROW_DCHECK_EQ(last_chunk.level_offset + last_chunk.levels_to_write, num_levels); |
| } |
| |
| template <typename RollFunc> |
| std::vector<Chunk> Calculate(const int16_t* def_levels, const int16_t* rep_levels, |
| int64_t num_levels, const RollFunc& RollValue) { |
| // Calculate the chunk boundaries for typed Arrow arrays. |
| // |
| // The chunking state is maintained across the entire column without being reset |
| // between pages and row groups. This enables that the chunking process can be |
| // continued between different WriteArrow calls. |
| // |
| // Below we go over the (def_level, rep_level, value) triplets one by one while |
| // adjusting the column-global rolling hash based on the triplet. Whenever the |
| // rolling hash matches a predefined mask it sets the `has_matched_` flag to true. |
| // |
| // After each triplet NeedNewChunk() is called to evaluate if we need to create |
| // a new chunk. If the rolling hash matches the mask `kNumGearhashTables` times in |
| // row (required for better chunk size distribution) and satisfies the chunk size |
| // requirements, we create a new chunk. See the `NeedNewChunk()` method for more |
| // details. |
| std::vector<Chunk> chunks; |
| int64_t offset; |
| int64_t prev_offset = 0; |
| int64_t prev_value_offset = 0; |
| bool has_def_levels = level_info_.def_level > 0; |
| bool has_rep_levels = level_info_.rep_level > 0; |
| |
| if (!has_rep_levels && !has_def_levels) { |
| // fastest path for non-nested non-null data |
| for (offset = 0; offset < num_levels; ++offset) { |
| RollValue(offset); |
| if (NeedNewChunk()) { |
| chunks.push_back({prev_offset, prev_offset, offset - prev_offset}); |
| prev_offset = offset; |
| } |
| } |
| // set the previous value offset to add the last chunk |
| prev_value_offset = prev_offset; |
| } else if (!has_rep_levels) { |
| // non-nested data with nulls |
| int16_t def_level; |
| for (int64_t offset = 0; offset < num_levels; ++offset) { |
| def_level = def_levels[offset]; |
| |
| Roll(&def_level); |
| if (def_level == level_info_.def_level) { |
| RollValue(offset); |
| } |
| if (NeedNewChunk()) { |
| chunks.push_back({prev_offset, prev_offset, offset - prev_offset}); |
| prev_offset = offset; |
| } |
| } |
| // set the previous value offset to add the last chunk |
| prev_value_offset = prev_offset; |
| } else { |
| // nested data with nulls |
| int16_t def_level; |
| int16_t rep_level; |
| int64_t value_offset = 0; |
| |
| for (offset = 0; offset < num_levels; ++offset) { |
| def_level = def_levels[offset]; |
| rep_level = rep_levels[offset]; |
| |
| Roll(&def_level); |
| Roll(&rep_level); |
| if (def_level == level_info_.def_level) { |
| RollValue(value_offset); |
| } |
| |
| if (rep_level == 0 && NeedNewChunk()) { |
| // if we are at a record boundary and need a new chunk, we create a new chunk |
| auto levels_to_write = offset - prev_offset; |
| if (levels_to_write > 0) { |
| chunks.push_back({prev_offset, prev_value_offset, levels_to_write}); |
| prev_offset = offset; |
| prev_value_offset = value_offset; |
| } |
| } |
| if (def_level >= level_info_.repeated_ancestor_def_level) { |
| // we only increment the value offset if we have a leaf value |
| ++value_offset; |
| } |
| } |
| } |
| |
| // add the last chunk if we have any levels left |
| if (prev_offset < num_levels) { |
| chunks.push_back({prev_offset, prev_value_offset, num_levels - prev_offset}); |
| } |
| #ifndef NDEBUG |
| ValidateChunks(chunks, num_levels); |
| #endif |
| |
| return chunks; |
| } |
| |
| template <int kByteWidth> |
| std::vector<Chunk> CalculateFixedWidth(const int16_t* def_levels, |
| const int16_t* rep_levels, int64_t num_levels, |
| const ::arrow::Array& values) { |
| const uint8_t* raw_values = |
| values.data()->GetValues<uint8_t>(/*i=*/1, /*absolute_offset=*/0) + |
| values.offset() * kByteWidth; |
| return Calculate(def_levels, rep_levels, num_levels, [&](int64_t i) { |
| return Roll<kByteWidth>(&raw_values[i * kByteWidth]); |
| }); |
| } |
| |
| template <typename ArrayType> |
| std::vector<Chunk> CalculateBinaryLike(const int16_t* def_levels, |
| const int16_t* rep_levels, int64_t num_levels, |
| const ::arrow::Array& values) { |
| const auto& array = checked_cast<const ArrayType&>(values); |
| return Calculate(def_levels, rep_levels, num_levels, [&](int64_t i) { |
| typename ArrayType::offset_type length; |
| const uint8_t* value = array.GetValue(i, &length); |
| Roll(value, length); |
| }); |
| } |
| |
| std::vector<Chunk> GetChunks(const int16_t* def_levels, const int16_t* rep_levels, |
| int64_t num_levels, const ::arrow::Array& values) { |
| auto handle_type = [&](auto&& type) -> std::vector<Chunk> { |
| using ArrowType = std::decay_t<decltype(type)>; |
| if constexpr (ArrowType::type_id == ::arrow::Type::NA) { |
| return Calculate(def_levels, rep_levels, num_levels, [](int64_t) {}); |
| } else if constexpr (ArrowType::type_id == ::arrow::Type::BOOL) { |
| const auto& array = static_cast<const ::arrow::BooleanArray&>(values); |
| return Calculate(def_levels, rep_levels, num_levels, |
| [&](int64_t i) { Roll(array.Value(i)); }); |
| } else if constexpr (ArrowType::type_id == ::arrow::Type::FIXED_SIZE_BINARY) { |
| const auto& array = static_cast<const ::arrow::FixedSizeBinaryArray&>(values); |
| const auto byte_width = array.byte_width(); |
| return Calculate(def_levels, rep_levels, num_levels, |
| [&](int64_t i) { Roll(array.GetValue(i), byte_width); }); |
| } else if constexpr (ArrowType::type_id == ::arrow::Type::EXTENSION) { |
| const auto& array = static_cast<const ::arrow::ExtensionArray&>(values); |
| return GetChunks(def_levels, rep_levels, num_levels, *array.storage()); |
| } else if constexpr (::arrow::is_primitive(ArrowType::type_id)) { |
| using c_type = typename ArrowType::c_type; |
| return CalculateFixedWidth<sizeof(c_type)>(def_levels, rep_levels, num_levels, |
| values); |
| } else if constexpr (::arrow::is_decimal(ArrowType::type_id)) { |
| return CalculateFixedWidth<ArrowType::kByteWidth>(def_levels, rep_levels, |
| num_levels, values); |
| } else if constexpr (::arrow::is_binary_like(ArrowType::type_id)) { |
| return CalculateBinaryLike<::arrow::BinaryArray>(def_levels, rep_levels, |
| num_levels, values); |
| } else if constexpr (::arrow::is_large_binary_like(ArrowType::type_id)) { |
| return CalculateBinaryLike<::arrow::LargeBinaryArray>(def_levels, rep_levels, |
| num_levels, values); |
| } else if constexpr (::arrow::is_dictionary(ArrowType::type_id)) { |
| return GetChunks(def_levels, rep_levels, num_levels, |
| *static_cast<const ::arrow::DictionaryArray&>(values).indices()); |
| } else { |
| throw ParquetException("Unsupported Arrow array type " + |
| values.type()->ToString()); |
| } |
| }; |
| return ::arrow::VisitType(*values.type(), handle_type); |
| } |
| |
| private: |
| // Reference to the column's level information |
| const internal::LevelInfo& level_info_; |
| // Minimum chunk size in bytes, the rolling hash will not be updated until this size is |
| // reached for each chunk. Note that all data sent through the hash function is counted |
| // towards the chunk size, including definition and repetition levels. |
| const int64_t min_chunk_size_; |
| const int64_t max_chunk_size_; |
| // The mask to match the rolling hash against to determine if a new chunk should be |
| // created. The mask is calculated based on min/max chunk size and the normalization |
| // level. |
| const uint64_t rolling_hash_mask_; |
| |
| // Whether the rolling hash has matched the mask since the last chunk creation. This |
| // flag is set true by the Roll() function when the mask is matched and reset to false |
| // by NeedNewChunk() method. |
| bool has_matched_ = false; |
| // The current run of the rolling hash, used to normalize the chunk size distribution |
| // by requiring multiple consecutive matches to create a new chunk. |
| int8_t nth_run_ = 0; |
| // Current chunk size in bytes, reset to 0 when a new chunk is created. |
| int64_t chunk_size_ = 0; |
| // Rolling hash state, never reset only initialized once for the entire column. |
| uint64_t rolling_hash_ = 0; |
| }; |
| |
| ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info, |
| int64_t min_chunk_size, |
| int64_t max_chunk_size, int norm_level) |
| : impl_(new Impl(level_info, min_chunk_size, max_chunk_size, norm_level)) {} |
| |
| ContentDefinedChunker::~ContentDefinedChunker() = default; |
| |
| std::vector<Chunk> ContentDefinedChunker::GetChunks(const int16_t* def_levels, |
| const int16_t* rep_levels, |
| int64_t num_levels, |
| const ::arrow::Array& values) { |
| return impl_->GetChunks(def_levels, rep_levels, num_levels, values); |
| } |
| |
| uint64_t ContentDefinedChunker::GetRollingHashMask() const { |
| return impl_->GetRollingHashMask(); |
| } |
| |
| } // namespace parquet::internal |