blob: d9fbe08deddf8b116bea25498477a5e331899caa [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_UTIL_DICT_ENCODING_H
#define IMPALA_UTIL_DICT_ENCODING_H
#include <map>
#include <boost/unordered_map.hpp>
#include "gutil/bits.h"
#include "gutil/strings/substitute.h"
#include "exec/parquet-common.h"
#include "runtime/mem-pool.h"
#include "runtime/string-value.h"
#include "util/rle-encoding.h"
namespace impala {
/// See the dictionary encoding section of https://github.com/Parquet/parquet-format.
/// This class supports dictionary encoding of all Impala types.
/// The encoding supports streaming encoding. Values are encoded as they are added while
/// the dictionary is being constructed. At any time, the buffered values can be
/// written out with the current dictionary size. More values can then be added to
/// the encoder, including new dictionary entries.
/// TODO: if the dictionary was made to be ordered, the dictionary would compress better.
/// Add this to the spec as future improvement.
/// Base class for encoders. This is convenient so users can have a type that
/// abstracts over the actual dictionary type.
/// Note: it does not provide a virtual Put(). Users are expected to know the subclass
/// type when using Put().
/// TODO: once we can easily remove virtual calls with codegen, this interface can
/// rely less on templating and be easier to follow. The type should be passed in
/// as an argument rather than template argument.
class DictEncoderBase {
public:
virtual ~DictEncoderBase() {
DCHECK(buffered_indices_.empty());
}
/// Writes out the encoded dictionary to buffer. buffer must be preallocated to
/// dict_encoded_size() bytes.
virtual void WriteDict(uint8_t* buffer) = 0;
/// The number of entries in the dictionary.
virtual int num_entries() const = 0;
/// Clears all the indices (but leaves the dictionary).
void ClearIndices() { buffered_indices_.clear(); }
/// Returns a conservative estimate of the number of bytes needed to encode the buffered
/// indices. Used to size the buffer passed to WriteData().
int EstimatedDataEncodedSize() {
return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size());
}
/// The minimum bit width required to encode the currently buffered indices.
int bit_width() const {
if (UNLIKELY(num_entries() == 0)) return 0;
if (UNLIKELY(num_entries() == 1)) return 1;
return Bits::Log2Ceiling64(num_entries());
}
/// Writes out any buffered indices to buffer preceded by the bit width of this data.
/// Returns the number of bytes written.
/// If the supplied buffer is not big enough, returns -1.
/// buffer must be preallocated with buffer_len bytes. Use EstimatedDataEncodedSize()
/// to size buffer.
int WriteData(uint8_t* buffer, int buffer_len);
int dict_encoded_size() { return dict_encoded_size_; }
protected:
DictEncoderBase(MemPool* pool)
: dict_encoded_size_(0), pool_(pool) {
}
/// Indices that have not yet be written out by WriteData().
std::vector<int> buffered_indices_;
/// The number of bytes needed to encode the dictionary.
int dict_encoded_size_;
/// Pool to store StringValue data. Not owned.
MemPool* pool_;
};
template<typename T>
class DictEncoder : public DictEncoderBase {
public:
DictEncoder(MemPool* pool, int encoded_value_size) :
DictEncoderBase(pool), buckets_(HASH_TABLE_SIZE, Node::INVALID_INDEX),
encoded_value_size_(encoded_value_size) { }
/// Encode value. Returns the number of bytes added to the dictionary page length
/// (will be 0 if this value is already in the dictionary) or -1 if the dictionary is
/// full (in which case the caller should give up on dictionary encoding). Note that
/// this does not actually write any data, just buffers the value's index to be
/// written later.
int Put(const T& value);
virtual void WriteDict(uint8_t* buffer);
virtual int num_entries() const { return nodes_.size(); }
private:
/// Size of the table. Must be a power of 2.
enum { HASH_TABLE_SIZE = 1 << 16 };
/// Dictates an upper bound on the capacity of the hash table.
typedef uint16_t NodeIndex;
/// Hash table mapping value to dictionary index (i.e. the number used to encode this
/// value in the data). Each table entry is a index into the nodes_ vector (giving the
/// first node of a chain for this bucket) or Node::INVALID_INDEX for an empty bucket.
std::vector<NodeIndex> buckets_;
/// Node in the chained hash table.
struct Node {
Node(const T& v, const NodeIndex& n) : value(v), next(n) { }
/// The dictionary value.
T value;
/// Index into nodes_ for the next Node in the chain. INVALID_INDEX indicates end.
NodeIndex next;
/// The maximum number of values in the dictionary. Chosen to be around 60% of
/// HASH_TABLE_SIZE to limit the expected length of the chains.
enum { INVALID_INDEX = 40000 };
};
/// The nodes of the hash table. Ordered by dictionary index (and so also represents
/// the reverse mapping from encoded index to value).
std::vector<Node> nodes_;
/// Size of each encoded dictionary value. -1 for variable-length types.
int encoded_value_size_;
/// Hash function for mapping a value to a bucket.
inline uint32_t Hash(const T& value) const;
/// Adds value to the hash table and updates dict_encoded_size_. Returns the
/// number of bytes added to dict_encoded_size_.
/// bucket gives a pointer to the location (i.e. chain) to add the value
/// so that the hash for value doesn't need to be recomputed.
int AddToTable(const T& value, NodeIndex* bucket);
};
/// Decoder class for dictionary encoded data. This class does not allocate any
/// buffers. The input buffers (dictionary buffer and RLE buffer) must be maintained
/// by the caller and valid as long as this object is.
class DictDecoderBase {
public:
/// The rle encoded indices into the dictionary. Returns an error status if the buffer
/// is too short or the bit_width metadata in the buffer is invalid.
Status SetData(uint8_t* buffer, int buffer_len) {
DCHECK_GE(buffer_len, 0);
if (UNLIKELY(buffer_len == 0)) return Status("Dictionary cannot be 0 bytes");
uint8_t bit_width = *buffer;
if (UNLIKELY(bit_width < 0 || bit_width > BitReader::MAX_BITWIDTH)) {
return Status(strings::Substitute("Dictionary has invalid or unsupported bit "
"width: $0", bit_width));
}
++buffer;
--buffer_len;
data_decoder_.Reset(buffer, buffer_len, bit_width);
return Status::OK();
}
virtual ~DictDecoderBase() {}
virtual int num_entries() const = 0;
protected:
RleDecoder data_decoder_;
};
template<typename T>
class DictDecoder : public DictDecoderBase {
public:
/// Construct empty dictionary.
DictDecoder() {}
/// Initialize the decoder with an input buffer containing the dictionary.
/// 'dict_len' is the byte length of dict_buffer.
/// For string data, the decoder returns StringValues with data directly from
/// dict_buffer (i.e. no copies).
/// fixed_len_size is the size that must be passed to decode fixed-length
/// dictionary values (values stored using FIXED_LEN_BYTE_ARRAY).
/// Returns true if the dictionary values were all successfully decoded, or false
/// if the dictionary was corrupt.
bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size);
virtual int num_entries() const { return dict_.size(); }
/// Returns the next value. Returns false if the data is invalid.
/// For StringValues, this does not make a copy of the data. Instead,
/// the string data is from the dictionary buffer passed into the c'tor.
bool GetValue(T* value);
private:
std::vector<T> dict_;
};
template<typename T>
inline int DictEncoder<T>::Put(const T& value) {
NodeIndex* bucket = &buckets_[Hash(value) & (HASH_TABLE_SIZE - 1)];
NodeIndex i = *bucket;
// Look for the value in the dictionary.
while (i != Node::INVALID_INDEX) {
const Node* n = &nodes_[i];
if (LIKELY(n->value == value)) {
// Value already in dictionary.
buffered_indices_.push_back(i);
return 0;
}
i = n->next;
}
// Value not found. Add it to the dictionary if there's space.
i = nodes_.size();
if (UNLIKELY(i >= Node::INVALID_INDEX)) return -1;
buffered_indices_.push_back(i);
return AddToTable(value, bucket);
}
template<typename T>
inline uint32_t DictEncoder<T>::Hash(const T& value) const {
return HashUtil::Hash(&value, sizeof(value), 0);
}
template<>
inline uint32_t DictEncoder<StringValue>::Hash(const StringValue& value) const {
return HashUtil::Hash(value.ptr, value.len, 0);
}
template<typename T>
inline int DictEncoder<T>::AddToTable(const T& value, NodeIndex* bucket) {
DCHECK_GT(encoded_value_size_, 0);
// Prepend the new node to this bucket's chain.
nodes_.push_back(Node(value, *bucket));
*bucket = nodes_.size() - 1;
dict_encoded_size_ += encoded_value_size_;
return encoded_value_size_;
}
template<>
inline int DictEncoder<StringValue>::AddToTable(const StringValue& value,
NodeIndex* bucket) {
char* ptr_copy = reinterpret_cast<char*>(pool_->Allocate(value.len));
memcpy(ptr_copy, value.ptr, value.len);
StringValue sv(ptr_copy, value.len);
// Prepend the new node to this bucket's chain.
nodes_.push_back(Node(sv, *bucket));
*bucket = nodes_.size() - 1;
int bytes_added = ParquetPlainEncoder::ByteSize(sv);
dict_encoded_size_ += bytes_added;
return bytes_added;
}
template<typename T>
inline bool DictDecoder<T>::GetValue(T* value) {
int index = -1; // Initialize to avoid compiler warning.
bool result = data_decoder_.Get(&index);
// Use & to avoid branches.
if (LIKELY(result & (index >= 0) & (index < dict_.size()))) {
*value = dict_[index];
return true;
}
return false;
}
template<>
inline bool DictDecoder<Decimal16Value>::GetValue(Decimal16Value* value) {
int index;
bool result = data_decoder_.Get(&index);
if (!result) return false;
if (index >= dict_.size()) return false;
// Workaround for IMPALA-959. Use memcpy instead of '=' so addresses
// do not need to be 16 byte aligned.
uint8_t* addr = reinterpret_cast<uint8_t*>(&dict_[0]);
addr = addr + index * sizeof(*value);
memcpy(value, addr, sizeof(*value));
return true;
}
template<typename T>
inline void DictEncoder<T>::WriteDict(uint8_t* buffer) {
for (const Node& node: nodes_) {
buffer += ParquetPlainEncoder::Encode(buffer, encoded_value_size_, node.value);
}
}
inline int DictEncoderBase::WriteData(uint8_t* buffer, int buffer_len) {
// Write bit width in first byte
*buffer = bit_width();
++buffer;
--buffer_len;
RleEncoder encoder(buffer, buffer_len, bit_width());
for (int index: buffered_indices_) {
if (!encoder.Put(index)) return -1;
}
encoder.Flush();
return 1 + encoder.len();
}
template<typename T>
inline bool DictDecoder<T>::Reset(uint8_t* dict_buffer, int dict_len,
int fixed_len_size) {
dict_.clear();
uint8_t* end = dict_buffer + dict_len;
while (dict_buffer < end) {
T value;
int decoded_len =
ParquetPlainEncoder::Decode(dict_buffer, end, fixed_len_size, &value);
if (UNLIKELY(decoded_len < 0)) return false;
dict_buffer += decoded_len;
dict_.push_back(value);
}
return true;
}
}
#endif