blob: 277e2e5f98023ed06e35797e9dc59a85142c1e00 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_
#define QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <unordered_map>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "storage/HashTableBase.hpp"
#include "storage/StorageBlob.hpp"
#include "storage/StorageConstants.hpp"
#include "storage/ValueAccessorMultiplexer.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "types/containers/ColumnVector.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
class AggregationHandle;
class StorageManager;
class Type;
/**
* @brief Specialized aggregation hash table that is preferable for two-phase
* aggregation with small-cardinality group-by keys. To use this hash
* table, it also requires that the group-by keys have fixed-length types
* with total byte size no greater than 8 (so that the keys can be packed
* into a 64-bit QWORD).
*/
class ThreadPrivateCompactKeyHashTable : public AggregationStateHashTableBase {
public:
/**
* @brief Constructor.
*
* @param key_types A vector of one or more types (>1 indicates a composite
* key).
* @param num_entries The estimated number of entries this hash table will
* hold.
* @param handles The aggregation handles.
* @param storage_manager The StorageManager to use (a StorageBlob will be
* allocated to hold this hash table's contents).
**/
ThreadPrivateCompactKeyHashTable(
const std::vector<const Type*> &key_types,
const std::size_t num_entries,
const std::vector<AggregationHandle*> &handles,
StorageManager *storage_manager);
~ThreadPrivateCompactKeyHashTable() override;
HashTableImplType getImplType() const override {
return HashTableImplType::kThreadPrivateCompactKey;
}
void destroyPayload() override {}
std::size_t getMemoryConsumptionBytes() const override {
return blob_->size();
}
/**
* @return The number of entries in this HashTable.
**/
inline std::size_t numEntries() const {
return buckets_allocated_;
}
bool upsertValueAccessorCompositeKey(
const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
const std::vector<MultiSourceAttributeId> &key_attr_ids,
const ValueAccessorMultiplexer &accessor_mux) override;
/**
* @brief Merge the states of \p source into this hash table.
*
* @param source The source hash table from which the states are to be merged
* into this hash table.
*/
void mergeFrom(const ThreadPrivateCompactKeyHashTable &source);
/**
* @brief Finalize all the aggregation state vectors and add the result column
* vectors into the output ColumnVectorsValueAccessor.
*
* @param output The ColumnVectorsValueAccessor to add all the result column
* vectors into.
*/
void finalize(ColumnVectorsValueAccessor *output) const;
private:
// Compact key as a 64-bit QWORD.
using KeyCode = std::uint64_t;
static constexpr std::size_t kKeyCodeSize = sizeof(KeyCode);
using BucketIndex = std::uint32_t;
inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
}
// Grow the size of this hash table by a factor of 2.
void resize();
template <std::size_t key_size>
inline static void ConstructKeyCode(const std::size_t offset,
const attribute_id attr_id,
ValueAccessor *accessor,
void *key_code_start) {
InvokeOnAnyValueAccessor(
accessor,
[&](auto *accessor) -> void { // NOLINT(build/c++11)
char *key_code_ptr = static_cast<char*>(key_code_start) + offset;
accessor->beginIteration();
while (accessor->next()) {
std::memcpy(key_code_ptr,
accessor->template getUntypedValue<false>(attr_id),
key_size);
key_code_ptr += kKeyCodeSize;
}
});
}
inline void upsertValueAccessorCount(const std::vector<BucketIndex> &bucket_indices,
void *state_vec) {
std::int64_t *states = static_cast<std::int64_t*>(state_vec);
for (const BucketIndex idx : bucket_indices) {
states[idx] += 1;
}
}
template <typename ArgumentT, typename StateT>
inline void upsertValueAccessorSum(const std::vector<BucketIndex> &bucket_indices,
const attribute_id attr_id,
ValueAccessor *accessor,
void *state_vec) {
InvokeOnAnyValueAccessor(
accessor,
[&](auto *accessor) -> void { // NOLINT(build/c++11)
accessor->beginIteration();
StateT *states = static_cast<StateT*>(state_vec);
for (const BucketIndex idx : bucket_indices) {
accessor->next();
states[idx] += *static_cast<const ArgumentT*>(
accessor->template getUntypedValue<false>(attr_id));
}
});
}
template <typename StateT>
inline void mergeStateSum(const std::vector<BucketIndex> &dst_bucket_indices,
const void *src_state_vec,
void *dst_state_vec) {
StateT *dst_states = static_cast<StateT*>(dst_state_vec);
const StateT* src_states = static_cast<const StateT*>(src_state_vec);
for (std::size_t i = 0; i < dst_bucket_indices.size(); ++i) {
dst_states[dst_bucket_indices[i]] += src_states[i];
}
}
template <std::size_t key_size>
inline void finalizeKey(const std::size_t offset,
NativeColumnVector *output_cv) const {
const char *key_ptr = reinterpret_cast<const char*>(keys_) + offset;
for (std::size_t i = 0; i < buckets_allocated_; ++i) {
std::memcpy(output_cv->getPtrForDirectWrite(),
key_ptr,
key_size);
key_ptr += kKeyCodeSize;
}
}
template <typename StateT, typename ResultT>
inline void finalizeStateSum(const void *state_vec,
NativeColumnVector *output_cv) const {
const StateT *states = static_cast<const StateT*>(state_vec);
for (std::size_t i = 0; i < buckets_allocated_; ++i) {
*static_cast<ResultT*>(output_cv->getPtrForDirectWrite()) = states[i];
}
}
const std::vector<const Type*> key_types_;
const std::vector<AggregationHandle *> handles_;
std::vector<std::size_t> key_sizes_;
std::vector<std::size_t> state_sizes_;
std::size_t total_state_size_;
std::size_t num_buckets_;
std::size_t buckets_allocated_;
// Maps a compact-key to its bucket location.
std::unordered_map<KeyCode, BucketIndex> index_;
// Compact-key array where keys_[i] holds the compact-key for bucket i.
KeyCode *keys_;
// Use a column-wise layout for aggregation states.
std::vector<void*> state_vecs_;
StorageManager *storage_manager_;
MutableBlobReference blob_;
DISALLOW_COPY_AND_ASSIGN(ThreadPrivateCompactKeyHashTable);
};
} // namespace quickstep
#endif // QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_