| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #ifndef IMPALA_RLE_ENCODING_H |
| #define IMPALA_RLE_ENCODING_H |
| |
| #include <math.h> |
| |
| #include "common/compiler-util.h" |
| #include "util/bit-stream-utils.inline.h" |
| #include "util/bit-util.h" |
| #include "util/mem-util.h" |
| #include "util/test-info.h" |
| |
| namespace impala { |
| |
| /// Utility classes to do run length encoding (RLE) for fixed bit width values. If runs |
| /// are sufficiently long, RLE is used, otherwise, the values are just bit-packed |
| /// (literal encoding). |
| /// For both types of runs, there is a byte-aligned indicator which encodes the length |
| /// of the run and the type of the run. |
| /// This encoding has the benefit that when there aren't any long enough runs, values |
| /// are always decoded at fixed (can be precomputed) bit offsets OR both the value and |
| /// the run length are byte aligned. This allows for very efficient decoding |
| /// implementations. |
| /// The encoding is: |
| /// encoded-block := run* |
| /// run := literal-run | repeated-run |
| /// literal-run := literal-indicator < literal bytes > |
| /// repeated-run := repeated-indicator < repeated value. padded to byte boundary > |
| /// literal-indicator := varint_encode( number_of_groups << 1 | 1) |
| /// repeated-indicator := varint_encode( number_of_repetitions << 1 ) |
| // |
| /// Each run is preceded by a varint. The varint's least significant bit is |
| /// used to indicate whether the run is a literal run or a repeated run. The rest |
| /// of the varint is used to determine the length of the run (eg how many times the |
| /// value repeats). |
| // |
| /// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode |
| /// in groups of 8), so that no matter the bit-width of the value, the sequence will end |
| /// on a byte boundary without padding. |
| /// Given that we know it is a multiple of 8, we store the number of 8-groups rather than |
| /// the actual number of encoded ints. (This means that the total number of encoded values |
| /// can not be determined from the encoded data, since the number of values in the last |
| /// group may not be a multiple of 8). For the last group of literal runs, we pad |
| /// the group to 8 with zeros. This allows for 8 at a time decoding on the read side |
| /// without the need for additional checks. |
| // |
| /// There is a break-even point when it is more storage efficient to do run length |
| /// encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes |
| /// for both the repeated encoding or the literal encoding. This value can always |
| /// be computed based on the bit-width. |
| /// TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but more |
| /// investigation is needed to do this efficiently, see the reverted IMPALA-6658. |
| /// TODO: think about how to use this for strings. The bit packing isn't quite the same. |
| // |
| /// Examples with bit-width 1 (eg encoding booleans): |
| /// ---------------------------------------- |
| /// 100 1s followed by 100 0s: |
| /// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> |
| /// - (total 4 bytes) |
| // |
| /// alternating 1s and 0s (200 total): |
| /// 200 ints = 25 groups of 8 |
| /// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> |
| /// (total 26 bytes, 1 byte overhead) |
| // |
| |
| /// RLE decoder with a batch-oriented interface that enables fast decoding. |
| /// Users of this class must first initialize the class to point to a buffer of |
| /// RLE-encoded data, passed into the constructor or Reset(). The provided |
| /// bit_width must be at most min(sizeof(T) * 8, BatchedBitReader::MAX_BITWIDTH). |
| /// Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to |
| /// see if the next run is a repeated or literal run, then calling |
| /// GetRepeatedValue() or GetLiteralValues() respectively to read the values. |
| /// |
| /// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0. |
| /// Other decoding errors are signalled by functions returning false. If an |
| /// error is encountered then it is not valid to read any more data until |
| /// Reset() is called. |
| template <typename T> |
| class RleBatchDecoder { |
| public: |
| RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) { |
| Reset(buffer, buffer_len, bit_width); |
| } |
| RleBatchDecoder() {} |
| |
| /// Reset the decoder to read from a new buffer. |
| void Reset(uint8_t* buffer, int buffer_len, int bit_width); |
| |
| /// Return the size of the current repeated run. Returns zero if the current run is |
| /// a literal run or if no more runs can be read from the input. |
| int32_t NextNumRepeats(); |
| |
| /// Get the value of the current repeated run and consume the given number of repeats. |
| /// Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot |
| /// be greater than the remaining number of repeats in the run. 'num_repeats_to_consume' |
| /// can be set to 0 to peek at the value without consuming repeats. |
| T GetRepeatedValue(int32_t num_repeats_to_consume); |
| |
| /// Return the size of the current literal run. Returns zero if the current run is |
| /// a repeated run or if no more runs can be read from the input. |
| int32_t NextNumLiterals(); |
| |
| /// Consume 'num_literals_to_consume' literals from the current literal run, |
| /// copying the values to 'values'. 'num_literals_to_consume' must be <= |
| /// NextNumLiterals(). Returns true if the requested number of literals were |
| /// successfully read or false if an error was encountered, e.g. the input was |
| /// truncated. |
| bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT; |
| |
| /// Consume 'num_literals_to_consume' literals from the current literal run, |
| /// decoding them using 'dict' and outputting them to 'out'. |
| /// 'num_literals_to_consume' must be <= NextNumLiterals(). Returns true if |
| /// the requested number of literals were successfully read or false if an error |
| /// was encountered, e.g. the input was truncated or the value was not present |
| /// in the dictionary. Errors can only be recovered from by calling Reset() |
| /// to read from a new buffer. |
| template <typename OutType> |
| bool DecodeLiteralValues(int32_t num_literals_to_consume, OutType* dict, |
| int64_t dict_len, StrideWriter<OutType>* RESTRICT out) WARN_UNUSED_RESULT; |
| |
| /// Convenience method to get the next value. Not efficient. Returns true on success |
| /// or false if no more values can be read from the input or an error was encountered |
| /// decoding the values. |
| bool GetSingleValue(T* val) WARN_UNUSED_RESULT; |
| |
| /// Consume 'num_values_to_consume' values and copy them to 'values'. |
| /// Returns the number of consumed values or 0 if an error occurred. |
| int32_t GetValues(int32_t num_values_to_consume, T* values); |
| |
| /// Skip 'num_values' values. |
| /// Returns the number of skipped values or 0 if an error occurred. |
| int32_t SkipValues(int32_t num_values); |
| |
| private: |
| /// Skip 'num_literals_to_skip' literals. |
| bool SkipLiteralValues(int32_t num_literals_to_skip) WARN_UNUSED_RESULT; |
| |
| /// Skip 'num_values' repeated values. |
| void SkipRepeatedValues(int32_t num_values); |
| |
| BatchedBitReader bit_reader_; |
| |
| /// Number of bits needed to encode the value. Must be between 0 and 64 after |
| /// the decoder is initialized with a buffer. -1 indicates the decoder was not |
| /// initialized. |
| int bit_width_ = -1; |
| |
| /// If a repeated run, the number of repeats remaining in the current run to be read. |
| /// If the current run is a literal run, this is 0. |
| int32_t repeat_count_ = 0; |
| |
| /// If a literal run, the number of literals remaining in the current run to be read. |
| /// If the current run is a repeated run, this is 0. |
| int32_t literal_count_ = 0; |
| |
| /// If a repeated run, the current repeated value. |
| T repeated_value_; |
| |
| /// Size of buffer for literal values. Large enough to decode a full batch of 32 |
| /// literals. The buffer is needed to allow clients to read in batches that are not |
| /// multiples of 32. |
| static constexpr int LITERAL_BUFFER_LEN = 32; |
| |
| /// Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the |
| /// position of the next literal to be read from the buffer. |
| T literal_buffer_[LITERAL_BUFFER_LEN]; |
| int num_buffered_literals_ = 0; |
| int literal_buffer_pos_ = 0; |
| |
| /// Called when both 'literal_count_' and 'repeat_count_' have been exhausted. |
| /// Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal |
| /// or repeated run, or leaves both at 0 if no more values can be read (either because |
| /// the end of the input was reached or an error was encountered decoding). |
| void NextCounts(); |
| |
| /// Fill the literal buffer. Invalid to call if there are already buffered literals. |
| /// Return false if the input was truncated. This does not advance 'literal_count_'. |
| bool FillLiteralBuffer() WARN_UNUSED_RESULT; |
| |
| bool HaveBufferedLiterals() const { |
| return literal_buffer_pos_ < num_buffered_literals_; |
| } |
| |
| /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing |
| /// 'literal_count_'. Returns the number of literals outputted. |
| int32_t OutputBufferedLiterals(int32_t max_to_output, T* values); |
| |
| /// Skip buffered literals |
| int32_t SkipBufferedLiterals(int32_t max_to_skip); |
| |
| /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing |
| /// 'literal_count_'. Returns the number of literals outputted or 0 if a |
| /// decoding error is encountered. |
| template <typename OutType> |
| int32_t DecodeBufferedLiterals(int32_t max_to_output, OutType* dict, int64_t dict_len, |
| StrideWriter<OutType>* RESTRICT out); |
| }; |
| |
| /// Class to incrementally build the rle data. This class does not allocate any memory. |
| /// The encoding has two modes: encoding repeated runs and literal runs. |
| /// If the run is sufficiently short, it is more efficient to encode as a literal run. |
| /// This class does so by buffering 8 values at a time. If they are not all the same |
| /// they are added to the literal run. If they are the same, they are added to the |
| /// repeated run. When we switch modes, the previous run is flushed out. |
| class RleEncoder { |
| public: |
| /// buffer/buffer_len: preallocated output buffer. |
| /// bit_width: max number of bits for value. |
| /// TODO: consider adding a min_repeated_run_length so the caller can control |
| /// when values should be encoded as repeated runs. Currently this is derived |
| /// based on the bit_width, which can determine a storage optimal choice. |
| /// TODO: allow 0 bit_width (and have dict encoder use it) |
| RleEncoder(uint8_t* buffer, int buffer_len, int bit_width) |
| : bit_width_(bit_width), bit_writer_(buffer, buffer_len) { |
| DCHECK_GE(bit_width_, 0); |
| DCHECK_LE(bit_width_, 64); |
| max_run_byte_size_ = MinBufferSize(bit_width); |
| DCHECK_GE(buffer_len, max_run_byte_size_) << "Input buffer not big enough."; |
| Clear(); |
| } |
| |
| /// Returns the minimum buffer size needed to use the encoder for 'bit_width' |
| /// This is the maximum length of a single run for 'bit_width'. |
| /// It is not valid to pass a buffer less than this length. |
| static int MinBufferSize(int bit_width) { |
| /// 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values. |
| int max_literal_run_size = |
| 1 + BitUtil::Ceil(MAX_VALUES_PER_LITERAL_RUN * bit_width, 8); |
| /// Up to MAX_VLQ_BYTE_LEN indicator and a single 'bit_width' value. |
| int max_repeated_run_size = |
| BatchedBitReader::max_vlq_byte_len<uint32_t>() + BitUtil::Ceil(bit_width, 8); |
| return std::max(max_literal_run_size, max_repeated_run_size); |
| } |
| |
| /// Returns the maximum byte size it could take to encode 'num_values'. |
| static int MaxBufferSize(int bit_width, int num_values) { |
| int bytes_per_run = BitUtil::Ceil(bit_width * MAX_VALUES_PER_LITERAL_RUN, 8.0); |
| int num_runs = BitUtil::Ceil(num_values, MAX_VALUES_PER_LITERAL_RUN); |
| int literal_max_size = num_runs + num_runs * bytes_per_run; |
| return std::max(MinBufferSize(bit_width), literal_max_size); |
| } |
| |
| /// Encode value. Returns true if the value fits in buffer, false otherwise. |
| /// This value must be representable with bit_width_ bits. |
| bool Put(uint64_t value) WARN_UNUSED_RESULT; |
| |
| /// Flushes any pending values to the underlying buffer. |
| /// Returns the total number of bytes written |
| int Flush(); |
| |
| /// Resets all the state in the encoder. |
| void Clear(); |
| |
| /// Returns pointer to underlying buffer |
| uint8_t* buffer() { return bit_writer_.buffer(); } |
| int32_t len() { return bit_writer_.bytes_written(); } |
| bool buffer_full() const { return buffer_full_; } |
| |
| private: |
| /// Flushes any buffered values. If this is part of a repeated run, this is largely |
| /// a no-op. |
| /// If it is part of a literal run, this will call FlushLiteralRun, which writes |
| /// out the buffered literal values. |
| /// If 'done' is true, the current run would be written even if it would normally |
| /// have been buffered more. This should only be called at the end, when the |
| /// encoder has received all values even if it would normally continue to be |
| /// buffered. |
| void FlushBufferedValues(bool done); |
| |
| /// Flushes literal values to the underlying buffer. If update_indicator_byte, |
| /// then the current literal run is complete and the indicator byte is updated. |
| void FlushLiteralRun(bool update_indicator_byte); |
| |
| /// Flushes a repeated run to the underlying buffer. |
| void FlushRepeatedRun(); |
| |
| /// Checks and sets buffer_full_. This must be called after flushing a run to |
| /// make sure there are enough bytes remaining to encode the next run. |
| void CheckBufferFull(); |
| |
| /// The maximum number of values in a single literal run |
| /// (number of groups encodable by a 1-byte indicator * 8) |
| static const int MAX_VALUES_PER_LITERAL_RUN = (1 << 6) * 8; |
| |
| /// Number of bits needed to encode the value. Must be between 0 and 64. |
| const int bit_width_; |
| |
| /// Underlying buffer. |
| BitWriter bit_writer_; |
| |
| /// If true, the buffer is full and subsequent Put()'s will fail. |
| bool buffer_full_; |
| |
| /// The maximum byte size a single run can take. |
| int max_run_byte_size_; |
| |
| /// We need to buffer at most 8 values for literals. This happens when the |
| /// bit_width is 1 (so 8 values fit in one byte). |
| /// TODO: generalize this to other bit widths |
| int64_t buffered_values_[8]; |
| |
| /// Number of values in buffered_values_ |
| int num_buffered_values_; |
| |
| /// The current (also last) value that was written and the count of how |
| /// many times in a row that value has been seen. This is maintained even |
| /// if we are in a literal run. If the repeat_count_ get high enough, we switch |
| /// to encoding repeated runs. |
| int64_t current_value_; |
| int repeat_count_; |
| |
| /// Number of literals in the current run. This does not include the literals |
| /// that might be in buffered_values_. Only after we've got a group big enough |
| /// can we decide if they should part of the literal_count_ or repeat_count_ |
| int literal_count_; |
| |
| /// Pointer to a byte in the underlying buffer that stores the indicator byte. |
| /// This is reserved as soon as we need a literal run but the value is written |
| /// when the literal run is complete. |
| uint8_t* literal_indicator_byte_; |
| }; |
| |
| /// This function buffers input values 8 at a time. After seeing all 8 values, |
| /// it decides whether they should be encoded as a literal or repeated run. |
| inline bool RleEncoder::Put(uint64_t value) { |
| DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); |
| if (UNLIKELY(buffer_full_)) return false; |
| |
| if (LIKELY(current_value_ == value |
| && repeat_count_ < std::numeric_limits<int32_t>::max())) { |
| ++repeat_count_; |
| if (repeat_count_ > 8) { |
| // This is just a continuation of the current run, no need to buffer the |
| // values. |
| // Note that this is the fast path for long repeated runs. |
| return true; |
| } |
| } else { |
| if (repeat_count_ >= 8) { |
| // We had a run that was long enough but it ended, either because of a different |
| // value or because it exceeded the maximum run length. Flush the current repeated |
| // run. |
| DCHECK_EQ(literal_count_, 0); |
| FlushRepeatedRun(); |
| } |
| repeat_count_ = 1; |
| current_value_ = value; |
| } |
| |
| buffered_values_[num_buffered_values_] = value; |
| if (++num_buffered_values_ == 8) { |
| DCHECK_EQ(literal_count_ % 8, 0); |
| FlushBufferedValues(false); |
| } |
| return true; |
| } |
| |
| inline void RleEncoder::FlushLiteralRun(bool update_indicator_byte) { |
| if (literal_indicator_byte_ == NULL) { |
| // The literal indicator byte has not been reserved yet, get one now. |
| literal_indicator_byte_ = bit_writer_.GetNextBytePtr(); |
| DCHECK(literal_indicator_byte_ != NULL); |
| } |
| |
| // Write all the buffered values as bit packed literals |
| for (int i = 0; i < num_buffered_values_; ++i) { |
| bool success = bit_writer_.PutValue(buffered_values_[i], bit_width_); |
| DCHECK(success) << "There is a bug in using CheckBufferFull()"; |
| } |
| num_buffered_values_ = 0; |
| |
| if (update_indicator_byte) { |
| // At this point we need to write the indicator byte for the literal run. |
| // We only reserve one byte, to allow for streaming writes of literal values. |
| // The logic makes sure we flush literal runs often enough to not overrun |
| // the 1 byte. |
| DCHECK_EQ(literal_count_ % 8, 0); |
| int num_groups = literal_count_ / 8; |
| int32_t indicator_value = (num_groups << 1) | 1; |
| DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); |
| *literal_indicator_byte_ = indicator_value; |
| literal_indicator_byte_ = NULL; |
| literal_count_ = 0; |
| CheckBufferFull(); |
| } |
| } |
| |
| inline void RleEncoder::FlushRepeatedRun() { |
| DCHECK_GT(repeat_count_, 0); |
| bool result = true; |
| // The lsb of 0 indicates this is a repeated run |
| uint32_t indicator_value = static_cast<uint32_t>(repeat_count_) << 1; |
| result &= bit_writer_.PutUleb128<uint32_t>(indicator_value); |
| result &= bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); |
| DCHECK(result); |
| num_buffered_values_ = 0; |
| repeat_count_ = 0; |
| CheckBufferFull(); |
| } |
| |
| /// Flush the values that have been buffered. At this point we decide whether |
| /// we need to switch between the run types or continue the current one. |
| inline void RleEncoder::FlushBufferedValues(bool done) { |
| if (repeat_count_ >= 8) { |
| // Clear the buffered values. They are part of the repeated run now and we |
| // don't want to flush them out as literals. |
| num_buffered_values_ = 0; |
| if (literal_count_ != 0) { |
| // There was a current literal run. All the values in it have been flushed |
| // but we still need to update the indicator byte. |
| DCHECK_EQ(literal_count_ % 8, 0); |
| DCHECK_EQ(repeat_count_, 8); |
| FlushLiteralRun(true); |
| } |
| DCHECK_EQ(literal_count_, 0); |
| return; |
| } |
| |
| literal_count_ += num_buffered_values_; |
| DCHECK_EQ(literal_count_ % 8, 0); |
| int num_groups = literal_count_ / 8; |
| if (num_groups + 1 >= (1 << 6)) { |
| // We need to start a new literal run because the indicator byte we've reserved |
| // cannot store more values. |
| DCHECK(literal_indicator_byte_ != NULL); |
| FlushLiteralRun(true); |
| } else { |
| FlushLiteralRun(done); |
| } |
| repeat_count_ = 0; |
| } |
| |
| inline int RleEncoder::Flush() { |
| if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { |
| bool all_repeat = literal_count_ == 0 && |
| (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); |
| // There is something pending, figure out if it's a repeated or literal run |
| if (repeat_count_ > 0 && all_repeat) { |
| FlushRepeatedRun(); |
| } else { |
| DCHECK_EQ(literal_count_ % 8, 0); |
| // Buffer the last group of literals to 8 by padding with 0s. |
| for (; num_buffered_values_ != 0 && num_buffered_values_ < 8; |
| ++num_buffered_values_) { |
| buffered_values_[num_buffered_values_] = 0; |
| } |
| literal_count_ += num_buffered_values_; |
| FlushLiteralRun(true); |
| repeat_count_ = 0; |
| } |
| } |
| bit_writer_.Flush(); |
| DCHECK_EQ(num_buffered_values_, 0); |
| DCHECK_EQ(literal_count_, 0); |
| DCHECK_EQ(repeat_count_, 0); |
| |
| return bit_writer_.bytes_written(); |
| } |
| |
| inline void RleEncoder::CheckBufferFull() { |
| int bytes_written = bit_writer_.bytes_written(); |
| if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) { |
| buffer_full_ = true; |
| } |
| } |
| |
| inline void RleEncoder::Clear() { |
| buffer_full_ = false; |
| current_value_ = 0; |
| repeat_count_ = 0; |
| num_buffered_values_ = 0; |
| literal_count_ = 0; |
| literal_indicator_byte_ = NULL; |
| bit_writer_.Clear(); |
| } |
| |
| template <typename T> |
| inline void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) { |
| DCHECK(buffer != nullptr); |
| DCHECK_GE(buffer_len, 0); |
| DCHECK_GE(bit_width, 0); |
| DCHECK_LE(bit_width, sizeof(T) * 8); |
| DCHECK_LE(bit_width, BatchedBitReader::MAX_BITWIDTH); |
| bit_reader_.Reset(buffer, buffer_len); |
| bit_width_ = bit_width; |
| repeat_count_ = 0; |
| literal_count_ = 0; |
| num_buffered_literals_ = 0; |
| literal_buffer_pos_ = 0; |
| } |
| |
| template <typename T> |
| inline int32_t RleBatchDecoder<T>::NextNumRepeats() { |
| if (repeat_count_ > 0) return repeat_count_; |
| if (literal_count_ == 0) NextCounts(); |
| return repeat_count_; |
| } |
| |
| template <typename T> |
| inline T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) { |
| DCHECK_GE(num_repeats_to_consume, 0); |
| DCHECK_GE(repeat_count_, num_repeats_to_consume); |
| repeat_count_ -= num_repeats_to_consume; |
| return repeated_value_; |
| } |
| |
| template <typename T> |
| inline void RleBatchDecoder<T>::SkipRepeatedValues(int32_t num_values) { |
| DCHECK_GT(num_values, 0); |
| DCHECK_GE(repeat_count_, num_values); |
| repeat_count_ -= num_values; |
| } |
| |
| template <typename T> |
| inline int32_t RleBatchDecoder<T>::NextNumLiterals() { |
| if (literal_count_ > 0) return literal_count_; |
| if (repeat_count_ == 0) NextCounts(); |
| return literal_count_; |
| } |
| |
| template <typename T> |
| inline bool RleBatchDecoder<T>::GetLiteralValues( |
| int32_t num_literals_to_consume, T* values) { |
| DCHECK_GE(num_literals_to_consume, 0); |
| DCHECK_GE(literal_count_, num_literals_to_consume); |
| int32_t num_consumed = 0; |
| // Copy any buffered literals left over from previous calls. |
| if (HaveBufferedLiterals()) { |
| num_consumed = OutputBufferedLiterals(num_literals_to_consume, values); |
| } |
| |
| int32_t num_remaining = num_literals_to_consume - num_consumed; |
| // Copy literals directly to the output, bypassing 'literal_buffer_' when possible. |
| // Need to round to a batch of 32 if the caller is consuming only part of the current |
| // run avoid ending on a non-byte boundary. |
| int32_t num_to_bypass = std::min<int32_t>(literal_count_, |
| BitUtil::RoundDownToPowerOf2(num_remaining, 32)); |
| if (num_to_bypass > 0) { |
| int num_read = |
| bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed); |
| // If we couldn't read the expected number, that means the input was truncated. |
| if (num_read < num_to_bypass) return false; |
| literal_count_ -= num_to_bypass; |
| num_consumed += num_to_bypass; |
| num_remaining = num_literals_to_consume - num_consumed; |
| } |
| |
| if (num_remaining > 0) { |
| // We weren't able to copy all the literals requested directly from the input. |
| // Buffer literals and copy over the requested number. |
| if (UNLIKELY(!FillLiteralBuffer())) return false; |
| int32_t num_copied = OutputBufferedLiterals(num_remaining, values + num_consumed); |
| DCHECK_EQ(num_copied, num_remaining) << "Should have buffered enough literals"; |
| } |
| return true; |
| } |
| |
| template <typename T> |
| inline bool RleBatchDecoder<T>::SkipLiteralValues(int32_t num_literals_to_skip) { |
| DCHECK_GT(num_literals_to_skip, 0); |
| DCHECK_GE(literal_count_, num_literals_to_skip); |
| DCHECK(!HaveBufferedLiterals()); |
| |
| int32_t num_remaining = num_literals_to_skip; |
| |
| // Need to round to a batch of 32 if the caller is skipping only part of the current |
| // run to avoid ending on a non-byte boundary. |
| int32_t num_to_skip = std::min<int32_t>(literal_count_, |
| BitUtil::RoundDownToPowerOf2(num_remaining, 32)); |
| if (num_to_skip > 0) { |
| bit_reader_.SkipBatch(bit_width_, num_to_skip); |
| literal_count_ -= num_to_skip; |
| num_remaining -= num_to_skip; |
| } |
| |
| if (num_remaining > 0) { |
| // Earlier we called RoundDownToPowerOf2() to skip literals that fit on byte boundary. |
| // But some literals still need to be skipped. Let's fill the literal buffer |
| // and skip 'num_remaining' values. |
| if (UNLIKELY(!FillLiteralBuffer())) return false; |
| if (SkipBufferedLiterals(num_remaining) != num_remaining) return false; |
| } |
| return true; |
| } |
| |
| template <typename T> |
| template <typename OutType> |
| inline bool RleBatchDecoder<T>::DecodeLiteralValues(int32_t num_literals_to_consume, |
| OutType* dict, int64_t dict_len, StrideWriter<OutType>* RESTRICT out) { |
| DCHECK_GT(num_literals_to_consume, 0); |
| DCHECK_GE(literal_count_, num_literals_to_consume); |
| |
| if (num_literals_to_consume == 0) return false; |
| |
| int32_t num_remaining = num_literals_to_consume; |
| // Decode any buffered literals left over from previous calls. |
| if (HaveBufferedLiterals()) { |
| int32_t num_consumed = DecodeBufferedLiterals(num_remaining, dict, dict_len, out); |
| if (UNLIKELY(num_consumed == 0)) return false; |
| DCHECK_LE(num_consumed, num_remaining); |
| num_remaining -= num_consumed; |
| } |
| |
| // Copy literals directly to the output, bypassing 'literal_buffer_' when possible. |
| // Need to round to a batch of 32 if the caller is consuming only part of the current |
| // run avoid ending on a non-byte boundery. |
| int32_t num_to_bypass = |
| std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32)); |
| if (num_to_bypass > 0) { |
| int num_read = bit_reader_.UnpackAndDecodeBatch( |
| bit_width_, dict, dict_len, num_to_bypass, out->current, out->stride); |
| // If we couldn't read the expected number, that means the input was truncated. |
| if (num_read < num_to_bypass) return false; |
| DCHECK_EQ(num_read, num_to_bypass); |
| literal_count_ -= num_to_bypass; |
| out->SkipNext(num_to_bypass); |
| num_remaining -= num_to_bypass; |
| } |
| |
| if (num_remaining > 0) { |
| // We weren't able to copy all the literals requested directly from the input. |
| // Buffer literals and copy over the requested number. |
| if (UNLIKELY(!FillLiteralBuffer())) return false; |
| int32_t num_copied = DecodeBufferedLiterals(num_remaining, dict, dict_len, out); |
| if (UNLIKELY(num_copied == 0)) return false; |
| DCHECK_EQ(num_copied, num_remaining) << "Should have buffered enough literals"; |
| } |
| return true; |
| } |
| |
| template <typename T> |
| inline bool RleBatchDecoder<T>::GetSingleValue(T* val) { |
| if (NextNumRepeats() > 0) { |
| DCHECK_EQ(0, NextNumLiterals()); |
| *val = GetRepeatedValue(1); |
| return true; |
| } |
| if (NextNumLiterals() > 0) { |
| DCHECK_EQ(0, NextNumRepeats()); |
| return GetLiteralValues(1, val); |
| } |
| return false; |
| } |
| |
| template <typename T> |
| inline void RleBatchDecoder<T>::NextCounts() { |
| DCHECK_GE(bit_width_, 0) << "RleBatchDecoder must be initialised"; |
| DCHECK_EQ(0, literal_count_); |
| DCHECK_EQ(0, repeat_count_); |
| // Read the next run's indicator int, it could be a literal or repeated run. |
| // The int is encoded as a ULEB128-encoded value. |
| uint32_t indicator_value = 0; |
| if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) return; |
| |
| // lsb indicates if it is a literal run or repeated run |
| bool is_literal = indicator_value & 1; |
| // Don't try to handle run lengths that don't fit in an int32_t - just fail gracefully. |
| // The Parquet standard does not allow longer runs - see PARQUET-1290. |
| uint32_t run_len = indicator_value >> 1; |
| DCHECK_LE(run_len, std::numeric_limits<int32_t>::max()) |
| << "Right-shifted uint32_t should fit in int32_t"; |
| if (is_literal) { |
| // Use int64_t to avoid overflowing multiplication. |
| int64_t literal_count = static_cast<int64_t>(run_len) * 8; |
| if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return; |
| literal_count_ = literal_count; |
| } else { |
| if (UNLIKELY(run_len == 0)) return; |
| bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_); |
| if (UNLIKELY(!result)) return; |
| repeat_count_ = run_len; |
| DCHECK_GE(repeat_count_, 0); |
| } |
| } |
| |
| template <typename T> |
| inline bool RleBatchDecoder<T>::FillLiteralBuffer() { |
| DCHECK(!HaveBufferedLiterals()); |
| int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_); |
| num_buffered_literals_ = |
| bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_); |
| // If we couldn't read the expected number, that means the input was truncated. |
| if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false; |
| literal_buffer_pos_ = 0; |
| return true; |
| } |
| |
| template <typename T> |
| inline int32_t RleBatchDecoder<T>::OutputBufferedLiterals( |
| int32_t max_to_output, T* values) { |
| int32_t num_to_output = |
| std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_); |
| memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output); |
| literal_buffer_pos_ += num_to_output; |
| literal_count_ -= num_to_output; |
| return num_to_output; |
| } |
| |
| template <typename T> |
| inline int32_t RleBatchDecoder<T>::SkipBufferedLiterals( |
| int32_t max_to_skip) { |
| int32_t num_to_skip = |
| std::min<int32_t>(max_to_skip, num_buffered_literals_ - literal_buffer_pos_); |
| literal_buffer_pos_ += num_to_skip; |
| literal_count_ -= num_to_skip; |
| return num_to_skip; |
| } |
| |
| template <typename T> |
| template <typename OutType> |
| inline int32_t RleBatchDecoder<T>::DecodeBufferedLiterals(int32_t max_to_output, |
| OutType* dict, int64_t dict_len, StrideWriter<OutType>* RESTRICT out) { |
| int32_t num_to_output = |
| std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_); |
| for (int32_t i = 0; i < num_to_output; ++i) { |
| T idx = literal_buffer_[literal_buffer_pos_ + i]; |
| if (UNLIKELY(idx < 0 || idx >= dict_len)) return 0; |
| out->SetNext(dict[idx]); |
| } |
| literal_buffer_pos_ += num_to_output; |
| literal_count_ -= num_to_output; |
| return num_to_output; |
| } |
| |
| template <typename T> |
| inline int32_t RleBatchDecoder<T>::GetValues(int32_t num_values_to_consume, T* values) { |
| DCHECK_GT(num_values_to_consume, 0); |
| DCHECK(values != nullptr); |
| |
| int32_t num_consumed = 0; |
| while (num_consumed < num_values_to_consume) { |
| // Add RLE encoded values by repeating the current value this number of times. |
| int32_t num_repeats = NextNumRepeats(); |
| if (num_repeats > 0) { |
| int32_t num_repeats_to_set = |
| std::min(num_repeats, num_values_to_consume - num_consumed); |
| T repeated_value = GetRepeatedValue(num_repeats_to_set); |
| for (int i = 0; i < num_repeats_to_set; ++i) { |
| values[num_consumed + i] = repeated_value; |
| } |
| num_consumed += num_repeats_to_set; |
| continue; |
| } |
| |
| // Add remaining literal values, if any. |
| int32_t num_literals = NextNumLiterals(); |
| if (num_literals == 0) break; |
| int32_t num_literals_to_set = |
| std::min(num_literals, num_values_to_consume - num_consumed); |
| if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) { |
| return 0; |
| } |
| num_consumed += num_literals_to_set; |
| } |
| return num_consumed; |
| } |
| |
| template <typename T> |
| inline int32_t RleBatchDecoder<T>::SkipValues(int32_t num_values) { |
| DCHECK_GT(num_values, 0); |
| |
| int32_t num_skipped = 0; |
| if (HaveBufferedLiterals()) { |
| num_skipped = SkipBufferedLiterals(num_values); |
| } |
| while (num_skipped < num_values) { |
| // Skip RLE encoded values |
| int32_t num_repeats = NextNumRepeats(); |
| if (num_repeats > 0) { |
| int32_t num_repeats_to_consume = std::min(num_repeats, num_values - num_skipped); |
| SkipRepeatedValues(num_repeats_to_consume); |
| num_skipped += num_repeats_to_consume; |
| continue; |
| } |
| |
| // Skip literals |
| int32_t num_literals = NextNumLiterals(); |
| if (num_literals == 0) break; |
| int32_t num_literals_to_skip = std::min(num_literals, num_values - num_skipped); |
| if (!SkipLiteralValues(num_literals_to_skip)) return 0; |
| num_skipped += num_literals_to_skip; |
| } |
| return num_skipped; |
| } |
| |
| template <typename T> |
| constexpr int RleBatchDecoder<T>::LITERAL_BUFFER_LEN; |
| } |
| |
| #endif |