blob: f440332d6d801394c8ccac7bad0ed7e2ab732ebc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_UTIL_DICT_ENCODING_H
#define IMPALA_UTIL_DICT_ENCODING_H
#include <map>
#include <boost/unordered_map.hpp>
#include "common/compiler-util.h"
#include "exec/parquet/parquet-common.h"
#include "gutil/strings/substitute.h"
#include "runtime/mem-pool.h"
#include "runtime/string-value.h"
#include "util/bit-util.h"
#include "util/mem-util.h"
#include "util/rle-encoding.h"
#include "util/ubsan.h"
namespace impala {
/// See the dictionary encoding section of https://github.com/Parquet/parquet-format.
/// This class supports dictionary encoding of all Impala types.
/// The encoding supports streaming encoding. Values are encoded as they are added while
/// the dictionary is being constructed. At any time, the buffered values can be
/// written out with the current dictionary size. More values can then be added to
/// the encoder, including new dictionary entries.
/// TODO: if the dictionary was made to be ordered, the dictionary would compress better.
/// Add this to the spec as future improvement.
/// Base class for encoders. This is convenient so users can have a type that
/// abstracts over the actual dictionary type.
/// Note: it does not provide a virtual Put(). Users are expected to know the subclass
/// type when using Put().
/// TODO: once we can easily remove virtual calls with codegen, this interface can
/// rely less on templating and be easier to follow. The type should be passed in
/// as an argument rather than template argument.
class DictEncoderBase {
public:
virtual ~DictEncoderBase() {
DCHECK(buffered_indices_.empty());
ReleaseBytes();
DCHECK_EQ(dict_bytes_cnt_, 0);
}
/// This function will clear the buffered_indices and
/// decrement the bytes used by dictionary.
void Close() {
ClearIndices();
ReleaseBytes();
}
/// Writes out the encoded dictionary to buffer. buffer must be preallocated to
/// dict_encoded_size() bytes.
virtual void WriteDict(uint8_t* buffer) = 0;
/// The number of entries in the dictionary.
virtual int num_entries() const = 0;
/// Clears all the indices (but leaves the dictionary).
void ClearIndices() { buffered_indices_.clear(); }
/// Returns a conservative estimate of the number of bytes needed to encode the buffered
/// indices. Used to size the buffer passed to WriteData().
int EstimatedDataEncodedSize() {
return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size());
}
/// The minimum bit width required to encode the currently buffered indices.
int bit_width() const {
if (UNLIKELY(num_entries() == 0)) return 0;
if (UNLIKELY(num_entries() == 1)) return 1;
return BitUtil::Log2Ceiling64(num_entries());
}
/// Writes out any buffered indices to buffer preceded by the bit width of this data.
/// Returns the number of bytes written.
/// If the supplied buffer is not big enough, returns -1.
/// buffer must be preallocated with buffer_len bytes. Use EstimatedDataEncodedSize()
/// to size buffer.
int WriteData(uint8_t* buffer, int buffer_len);
int dict_encoded_size() { return dict_encoded_size_; }
void UsedbyTest() { used_by_test_ = true;}
protected:
DictEncoderBase(MemPool* pool, MemTracker* mem_tracker) :
dict_encoded_size_(0),
pool_(pool),
dict_bytes_cnt_(0),
dict_mem_tracker_(mem_tracker) { }
/// Indices that have not yet be written out by WriteData().
std::vector<int> buffered_indices_;
/// Memtracker Consume is called every ENC_MEM_TRACK_CNT times.
/// Periodicity of calling Memtracker Consume.
const int ENC_MEM_TRACK_CNT = 8192;
/// Number of times ConsumeBytes() was called.
int num_call_track_{0};
/// The number of bytes needed to encode the dictionary.
int dict_encoded_size_{0};
/// Pool to store StringValue data. Not owned.
MemPool* pool_{nullptr};
/// This will account for bytes consumed by nodes_
int dict_bytes_cnt_{0};
/// This will account for bytes consumed, last_accounted by memtracker
int dict_bytes_cnt_memtrack_{0};
/// This will track the memory used by nodes_
MemTracker* dict_mem_tracker_{nullptr};
/// Function to decrement the byte counter and decrease the bytes usage
/// of the memory tracker.
void ReleaseBytes() {
if (dict_mem_tracker_ != nullptr) {
dict_mem_tracker_->Release(dict_bytes_cnt_memtrack_);
dict_bytes_cnt_ = 0;
dict_bytes_cnt_memtrack_ = 0;
num_call_track_ = 0;
}
}
/// Used by dict-test.cc to check the usage of bytes by dictionary
/// is properly accounted.
bool used_by_test_{false};
/// Function to increment the byte counter and increase the bytes usage
/// of the memory tracker.
void ConsumeBytes(int num_bytes) {
if (dict_mem_tracker_ != nullptr) {
dict_bytes_cnt_ += num_bytes;
// Calling Memtracker frequently may be expensive so update
// the memtracker every ENC_MEM_TRACK_CNT times.
if (num_call_track_ % ENC_MEM_TRACK_CNT == 0 || used_by_test_ == true) {
// TODO: TryConsume() can be called to check if memory limit has been exceeded.
dict_mem_tracker_->Consume(dict_bytes_cnt_ - dict_bytes_cnt_memtrack_);
dict_bytes_cnt_memtrack_ = dict_bytes_cnt_;
}
num_call_track_++;
}
}
};
template<typename T>
class DictEncoder : public DictEncoderBase {
public:
DictEncoder(MemPool* pool, int encoded_value_size, MemTracker* mem_tracker) :
DictEncoderBase(pool, mem_tracker), buckets_(HASH_TABLE_SIZE, Node::INVALID_INDEX),
encoded_value_size_(encoded_value_size) { }
/// Encode value. Returns the number of bytes added to the dictionary page length
/// (will be 0 if this value is already in the dictionary) or -1 if the dictionary is
/// full (in which case the caller should give up on dictionary encoding). Note that
/// this does not actually write any data, just buffers the value's index to be
/// written later.
int Put(const T& value);
/// This function returns the size in bytes of the dictionary vector.
/// It is used by dict-test.cc for validation of bytes consumed against
/// memory tracked.
int DictByteSize() {
return sizeof(Node) * nodes_.size();
}
virtual void WriteDict(uint8_t* buffer);
virtual int num_entries() const { return nodes_.size(); }
private:
/// Size of the table. Must be a power of 2.
enum { HASH_TABLE_SIZE = 1 << 16 };
/// Dictates an upper bound on the capacity of the hash table.
typedef uint16_t NodeIndex;
/// Hash table mapping value to dictionary index (i.e. the number used to encode this
/// value in the data). Each table entry is a index into the nodes_ vector (giving the
/// first node of a chain for this bucket) or Node::INVALID_INDEX for an empty bucket.
std::vector<NodeIndex> buckets_;
/// Node in the chained hash table.
struct Node {
Node(const T& v, const NodeIndex& n) : value(v), next(n) { }
/// The dictionary value.
T value;
/// Index into nodes_ for the next Node in the chain. INVALID_INDEX indicates end.
NodeIndex next;
/// The maximum number of values in the dictionary. Chosen to be around 60% of
/// HASH_TABLE_SIZE to limit the expected length of the chains.
enum { INVALID_INDEX = 40000 };
};
/// The nodes of the hash table. Ordered by dictionary index (and so also represents
/// the reverse mapping from encoded index to value).
std::vector<Node> nodes_;
/// Size of each encoded dictionary value. -1 for variable-length types.
int encoded_value_size_;
/// Hash function for mapping a value to a bucket.
inline uint32_t Hash(const T& value) const;
/// Adds value to the hash table and updates dict_encoded_size_. Returns the
/// number of bytes added to dict_encoded_size_.
/// bucket gives a pointer to the location (i.e. chain) to add the value
/// so that the hash for value doesn't need to be recomputed.
int AddToTable(const T& value, NodeIndex* bucket);
};
/// Number of decoded values to buffer at a time. A multiple of 32 is chosen to allow
/// efficient reading in batches from data_decoder_. Increasing the batch size up to
/// 128 seems to improve performance, but increasing further did not make a noticeable
/// difference. Defined outside DictDecoderBase to get static linkage because there is
/// no dict-encoding.cc file.
static constexpr int32_t DICT_DECODER_BUFFER_SIZE = 128;
/// Decoder class for dictionary encoded data. This class does not allocate any
/// buffers. The input buffers (dictionary buffer and RLE buffer) must be maintained
/// by the caller and valid as long as this object is.
class DictDecoderBase {
public:
DictDecoderBase(MemTracker* tracker) :
dict_bytes_cnt_(0), dict_mem_tracker_(tracker) { }
/// The rle encoded indices into the dictionary. Returns an error status if the buffer
/// is too short or the bit_width metadata in the buffer is invalid.
Status SetData(uint8_t* buffer, int buffer_len) {
DCHECK_GE(buffer_len, 0);
if (UNLIKELY(buffer_len == 0)) return Status("Dictionary cannot be 0 bytes");
uint8_t bit_width = *buffer;
if (UNLIKELY(bit_width < 0 || bit_width > sizeof(IndexType) * 8
|| bit_width > BatchedBitReader::MAX_BITWIDTH)) {
return Status(strings::Substitute("Dictionary has invalid or unsupported bit "
"width: $0", bit_width));
}
++buffer;
--buffer_len;
data_decoder_.Reset(buffer, buffer_len, bit_width);
num_repeats_ = 0;
num_literal_values_ = 0;
next_literal_idx_ = 0;
return Status::OK();
}
virtual ~DictDecoderBase() {
ReleaseBytes();
DCHECK_EQ(dict_bytes_cnt_, 0);
}
virtual int num_entries() const = 0;
/// Reads the dictionary value at the specified index into the buffer provided.
/// The buffer must be large enough to receive the datatype for this dictionary.
virtual void GetValue(int index, void* buffer) = 0;
/// This function will decrement the bytes used by dictionary, MemTracker
void Close() {
ReleaseBytes();
}
protected:
using IndexType = uint32_t;
RleBatchDecoder<IndexType> data_decoder_;
/// Greater than zero if we've started decoding a repeated run.
int64_t num_repeats_ = 0;
/// Greater than zero if we have buffered some literal values.
int num_literal_values_ = 0;
/// The index of the next decoded value to return.
int next_literal_idx_ = 0;
/// This will account for bytes consumed by dict_
int dict_bytes_cnt_{0};
/// This will track the memory used by dict_
MemTracker* dict_mem_tracker_{nullptr};
/// Function to decrement the byte counter and decrease the bytes usage
/// of the memory tracker.
void ReleaseBytes() {
if (dict_mem_tracker_ != nullptr) {
dict_mem_tracker_->Release(dict_bytes_cnt_);
dict_bytes_cnt_ = 0;
}
}
/// Function to increment the byte counter and increase the bytes usage
/// of the memory tracker.
void ConsumeBytes(int num_bytes) {
if (dict_mem_tracker_ != nullptr) {
dict_bytes_cnt_ += num_bytes;
dict_mem_tracker_->Consume(num_bytes);
}
}
};
template<typename T>
class DictDecoder : public DictDecoderBase {
public:
/// Construct empty dictionary.
DictDecoder(MemTracker* tracker):DictDecoderBase(tracker) {}
/// Initialize the decoder with an input buffer containing the dictionary.
/// 'dict_len' is the byte length of dict_buffer.
/// For string data, the decoder returns StringValues with data directly from
/// dict_buffer (i.e. no copies).
/// fixed_len_size is the size that must be passed to decode fixed-length
/// dictionary values (values stored using FIXED_LEN_BYTE_ARRAY).
/// Returns true if the dictionary values were all successfully decoded, or false
/// if the dictionary was corrupt.
template<parquet::Type::type PARQUET_TYPE>
bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size) WARN_UNUSED_RESULT;
/// Should be only called for Timestamp columns.
void SetTimestampHelper(ParquetTimestampDecoder timestamp_decoder) {
timestamp_decoder_ = timestamp_decoder;
}
virtual int num_entries() const { return dict_.size(); }
virtual void GetValue(int index, void* buffer) {
T* val_ptr = reinterpret_cast<T*>(buffer);
DCHECK_GE(index, 0);
DCHECK_LT(index, dict_.size());
*val_ptr = dict_[index];
}
/// Returns the next value. Returns false if the data is invalid.
/// For StringValues, this does not make a copy of the data. Instead,
/// the string data is from the dictionary buffer passed into the c'tor.
bool GetNextValue(T* value) WARN_UNUSED_RESULT;
/// Batched version of GetNextValue(). Reads the next 'count' values into
/// 'first_values'. Returns false if the data was invalid and 'count' values could not
/// be successfully read. 'stride' is the stride in bytes between each subsequent value.
bool GetNextValues(T* first_value, int64_t stride, int count) WARN_UNUSED_RESULT;
/// This function returns the size in bytes of the dictionary vector.
/// It is used by dict-test.cc for validation of bytes consumed against
/// memory tracked.
int DictByteSize() {
return sizeof(T) * dict_.size();
}
/// Skip 'num_values' values from the input.
bool SkipValues(int64_t num_values) WARN_UNUSED_RESULT;
private:
/// List of decoded values stored in the dict_
std::vector<T> dict_;
/// Contains extra data needed for Timestamp decoding.
ParquetTimestampDecoder timestamp_decoder_;
/// Decoded values, buffered to allow caller to consume one-by-one. If in the middle of
/// a repeated run, the first element is the current dict value. If in a literal run,
/// this contains 'num_literal_values_' values, with the next value to be returned at
/// 'next_literal_idx_'.
T decoded_values_[DICT_DECODER_BUFFER_SIZE];
/// Copy as many as possible literal values, up to 'max_to_copy' from 'decoded_values_'
/// to '*out'. Return the number copied and advance '*out'.
uint32_t CopyLiteralsToOutput(
uint32_t max_to_copy, StrideWriter<T>* RESTRICT out) RESTRICT;
/// Slow path for GetNextValue() where we need to decode new values. Should not be
/// inlined everywhere.
bool DecodeNextValue(T* value);
/// Specialized for Timestamp columns, simple proxy to ParquetPlainEncoder::Decode
/// for other types.
template<parquet::Type::type PARQUET_TYPE>
int Decode(const uint8_t* buffer, const uint8_t* buffer_end,
int fixed_len_size, T* v) {
return ParquetPlainEncoder::Decode<T, PARQUET_TYPE>(buffer, buffer_end,
fixed_len_size, v);
}
};
template<typename T>
inline int DictEncoder<T>::Put(const T& value) {
NodeIndex* bucket = &buckets_[Hash(value) & (HASH_TABLE_SIZE - 1)];
NodeIndex i = *bucket;
// Look for the value in the dictionary.
while (i != Node::INVALID_INDEX) {
const Node* n = &nodes_[i];
if (LIKELY(n->value == value)) {
// Value already in dictionary.
buffered_indices_.push_back(i);
return 0;
}
i = n->next;
}
// Value not found. Add it to the dictionary if there's space.
i = nodes_.size();
if (UNLIKELY(i >= Node::INVALID_INDEX)) return -1;
buffered_indices_.push_back(i);
return AddToTable(value, bucket);
}
template<typename T>
inline uint32_t DictEncoder<T>::Hash(const T& value) const {
return HashUtil::Hash(&value, sizeof(value), 0);
}
template<>
inline uint32_t DictEncoder<StringValue>::Hash(const StringValue& value) const {
return HashUtil::Hash(value.ptr, value.len, 0);
}
template<typename T>
inline int DictEncoder<T>::AddToTable(const T& value, NodeIndex* bucket) {
DCHECK_GT(encoded_value_size_, 0);
Node node(value, *bucket);
ConsumeBytes(sizeof(node));
// Prepend the new node to this bucket's chain.
nodes_.push_back(node);
*bucket = nodes_.size() - 1;
dict_encoded_size_ += encoded_value_size_;
return encoded_value_size_;
}
template<>
inline int DictEncoder<StringValue>::AddToTable(const StringValue& value,
NodeIndex* bucket) {
char* ptr_copy = reinterpret_cast<char*>(pool_->Allocate(value.len));
Ubsan::MemCpy(ptr_copy, value.ptr, value.len);
StringValue sv(ptr_copy, value.len);
Node node(sv, *bucket);
ConsumeBytes(sizeof(node));
// Prepend the new node to this bucket's chain.
nodes_.push_back(node);
*bucket = nodes_.size() - 1;
int bytes_added = ParquetPlainEncoder::ByteSize(sv);
dict_encoded_size_ += bytes_added;
return bytes_added;
}
// Force inlining - GCC does not always inline this into hot loops in Parquet scanner.
template <typename T>
ALWAYS_INLINE inline bool DictDecoder<T>::GetNextValue(T* value) {
// IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not always 16
// byte aligned for Decimal16Values.
if (num_repeats_ > 0) {
--num_repeats_;
memcpy(value, &decoded_values_[0], sizeof(T));
return true;
} else if (next_literal_idx_ < num_literal_values_) {
int idx = next_literal_idx_++;
memcpy(value, &decoded_values_[idx], sizeof(T));
return true;
}
// No decoded values left - need to decode some more.
return DecodeNextValue(value);
}
template <typename T>
ALWAYS_INLINE inline bool DictDecoder<T>::GetNextValues(
T* first_value, int64_t stride, int count) {
DCHECK_GE(count, 0);
StrideWriter<T> out(first_value, stride);
if (num_repeats_ > 0) {
// Consume any already-decoded repeated value.
int num_to_copy = std::min<uint32_t>(num_repeats_, count);
T repeated_val = decoded_values_[0];
out.SetNext(repeated_val, num_to_copy);
count -= num_to_copy;
num_repeats_ -= num_to_copy;
} else if (next_literal_idx_ < num_literal_values_) {
// Consume any already-decoded literal values.
count -= CopyLiteralsToOutput(count, &out);
}
DCHECK_GE(count, 0);
while (count > 0) {
uint32_t num_repeats = data_decoder_.NextNumRepeats();
if (num_repeats > 0) {
// Decode repeats directly to the output.
uint32_t num_repeats_to_consume = std::min<uint32_t>(num_repeats, count);
const IndexType idx = data_decoder_.GetRepeatedValue(num_repeats_to_consume);
if (UNLIKELY(idx >= dict_.size())) return false;
T repeated_val = dict_[idx];
out.SetNext(repeated_val, num_repeats_to_consume);
count -= num_repeats_to_consume;
} else {
// Decode as many literals as possible directly to the output, buffer the rest.
uint32_t num_literals = data_decoder_.NextNumLiterals();
if (UNLIKELY(num_literals == 0)) return false;
// Case 1: decode the whole literal run directly to the output.
// Case 2: decode none or some of the run to the output, buffer some remaining.
if (count >= num_literals) { // Case 1
if (UNLIKELY(!data_decoder_.DecodeLiteralValues(
num_literals, dict_.data(), dict_.size(), &out))) {
return false;
}
count -= num_literals;
} else { // Case 2
uint32_t num_to_decode = BitUtil::RoundDown(count, 32);
if (num_to_decode > 0 && UNLIKELY(!data_decoder_.DecodeLiteralValues(
num_to_decode, dict_.data(), dict_.size(), &out))) {
return false;
}
count -= num_to_decode;
DCHECK_GE(count, 0);
if (count > 0) {
if (UNLIKELY(!DecodeNextValue(out.Advance()))) return false;
--count;
// Consume any already-decoded literal values.
count -= CopyLiteralsToOutput(count, &out);
}
return true;
}
}
}
return true;
}
template <typename T>
ALWAYS_INLINE inline bool DictDecoder<T>::SkipValues(int64_t num_values) {
int64_t num_remaining = num_values;
if (num_repeats_ > 0) {
int64_t num_to_skip = std::min(num_remaining, num_repeats_);
num_repeats_ -= num_to_skip;
num_remaining -= num_to_skip;
} else if (next_literal_idx_ < num_literal_values_) {
int64_t num_to_skip = std::min<int64_t>(num_literal_values_ -
next_literal_idx_, num_remaining);
next_literal_idx_ += num_to_skip;
num_remaining -= num_to_skip;
}
if (num_remaining > 0) return data_decoder_.SkipValues(num_remaining) == num_remaining;
return true;
}
template <typename T>
uint32_t DictDecoder<T>::CopyLiteralsToOutput(
uint32_t max_to_copy, StrideWriter<T>* out) {
uint32_t num_to_copy =
std::min<uint32_t>(num_literal_values_ - next_literal_idx_, max_to_copy);
for (uint32_t i = 0; i < num_to_copy; ++i) {
out->SetNext(decoded_values_[next_literal_idx_++]);
}
return num_to_copy;
}
template <typename T>
bool DictDecoder<T>::DecodeNextValue(T* value) {
// IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not always 16
// byte aligned for Decimal16Values.
int32_t num_repeats = data_decoder_.NextNumRepeats();
DCHECK_GE(num_repeats, 0);
if (num_repeats > 0) {
const IndexType idx = data_decoder_.GetRepeatedValue(num_repeats);
if (UNLIKELY(idx >= dict_.size())) return false;
memcpy(&decoded_values_[0], &dict_[idx], sizeof(T));
memcpy(value, &decoded_values_[0], sizeof(T));
num_repeats_ = num_repeats - 1;
return true;
} else {
int32_t num_literals = data_decoder_.NextNumLiterals();
if (UNLIKELY(num_literals == 0)) return false;
DCHECK_GT(num_literals, 0);
int32_t num_to_decode = std::min(num_literals, DICT_DECODER_BUFFER_SIZE);
StrideWriter<T> dst(&decoded_values_[0], sizeof(T));
if (UNLIKELY(!data_decoder_.DecodeLiteralValues(num_to_decode, dict_.data(),
dict_.size(), &dst))) {
return false;
}
num_literal_values_ = num_to_decode;
memcpy(value, &decoded_values_[0], sizeof(T));
next_literal_idx_ = 1;
return true;
}
}
template<typename T>
inline void DictEncoder<T>::WriteDict(uint8_t* buffer) {
for (const Node& node: nodes_) {
buffer += ParquetPlainEncoder::Encode(node.value, encoded_value_size_, buffer);
}
}
inline int DictEncoderBase::WriteData(uint8_t* buffer, int buffer_len) {
// Write bit width in first byte
*buffer = bit_width();
++buffer;
--buffer_len;
RleEncoder encoder(buffer, buffer_len, bit_width());
for (int index: buffered_indices_) {
if (!encoder.Put(index)) return -1;
}
encoder.Flush();
return 1 + encoder.len();
}
template <>
template<parquet::Type::type PARQUET_TYPE>
inline int DictDecoder<TimestampValue>::Decode(const uint8_t* buffer,
const uint8_t* buffer_end, int fixed_len_size, TimestampValue* v) {
return timestamp_decoder_.Decode<PARQUET_TYPE>(buffer, buffer_end, v);
}
template<typename T>
template<parquet::Type::type PARQUET_TYPE>
inline bool DictDecoder<T>::Reset(uint8_t* dict_buffer, int dict_len,
int fixed_len_size) {
dict_.clear();
ReleaseBytes();
uint8_t* end = dict_buffer + dict_len;
while (dict_buffer < end) {
T value;
int decoded_len = Decode<PARQUET_TYPE>(dict_buffer, end,
fixed_len_size, &value);
if (UNLIKELY(decoded_len < 0)) return false;
dict_buffer += decoded_len;
dict_.push_back(value);
}
ConsumeBytes(sizeof(T) * dict_.size());
return true;
}
}
#endif