Use partitioned aggregation for single-function DISTINCT aggregation.
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0f39b41..eef2c9d 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -19,8 +19,10 @@
#include "storage/AggregationOperationState.hpp"
+#include <algorithm>
#include <cstddef>
#include <cstdint>
+#include <functional>
#include <memory>
#include <string>
#include <utility>
@@ -87,6 +89,8 @@
is_aggregate_partitioned_(false),
predicate_(predicate),
is_distinct_(std::move(is_distinct)),
+ all_distinct_(std::accumulate(is_distinct_.begin(), is_distinct_.end(),
+ !is_distinct_.empty(), std::logical_and<bool>())),
storage_manager_(storage_manager) {
if (!group_by.empty()) {
if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
@@ -163,11 +167,6 @@
handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
if (!group_by_key_ids_.empty()) {
- // Aggregation with GROUP BY: combined payload is partially updated in
- // the presence of DISTINCT.
- if (*is_distinct_it) {
- handles_.back()->blockUpdate();
- }
group_by_handles.emplace_back(handles_.back().get());
} else {
// Aggregation without GROUP BY: create a single global state.
@@ -180,17 +179,32 @@
std::vector<const Type *> key_types(group_by_types_);
key_types.insert(
key_types.end(), argument_types.begin(), argument_types.end());
+
// TODO(jianqiao): estimated_num_entries is quite inaccurate for
// estimating the number of entries in the distinctify hash table.
// We need to estimate for each distinct aggregation an
// estimated_num_distinct_keys value during query optimization.
- distinctify_hashtables_.emplace_back(
- AggregationStateHashTableFactory::CreateResizable(
- *distinctify_hash_table_impl_types_it,
- key_types,
- estimated_num_entries,
- {} /* handles */,
- storage_manager));
+ if (is_aggregate_partitioned_) {
+ DCHECK(partitioned_group_by_hashtable_pool_ == nullptr);
+ partitioned_group_by_hashtable_pool_.reset(
+ new PartitionedHashTablePool(estimated_num_entries,
+ FLAGS_num_aggregation_partitions,
+ *distinctify_hash_table_impl_types_it,
+ key_types,
+ {},
+ storage_manager));
+ } else {
+ distinctify_hashtables_.emplace_back(
+ AggregationStateHashTableFactory::CreateResizable(
+ *distinctify_hash_table_impl_types_it,
+ key_types,
+ estimated_num_entries,
+ {} /* handles */,
+ storage_manager));
+
+ // Combined payload is partially updated in the presence of DISTINCT.
+ handles_.back()->blockUpdate();
+ }
++distinctify_hash_table_impl_types_it;
} else {
distinctify_hashtables_.emplace_back(nullptr);
@@ -208,13 +222,24 @@
group_by_handles,
storage_manager));
} else if (is_aggregate_partitioned_) {
- partitioned_group_by_hashtable_pool_.reset(
- new PartitionedHashTablePool(estimated_num_entries,
- FLAGS_num_aggregation_partitions,
- hash_table_impl_type,
- group_by_types_,
- group_by_handles,
- storage_manager));
+ if (all_distinct_) {
+ DCHECK_EQ(1u, group_by_handles.size());
+ DCHECK(partitioned_group_by_hashtable_pool_ != nullptr);
+ group_by_hashtable_pool_.reset(
+ new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types_,
+ group_by_handles,
+ storage_manager));
+ } else {
+ partitioned_group_by_hashtable_pool_.reset(
+ new PartitionedHashTablePool(estimated_num_entries,
+ FLAGS_num_aggregation_partitions,
+ hash_table_impl_type,
+ group_by_types_,
+ group_by_handles,
+ storage_manager));
+ }
} else {
group_by_hashtable_pool_.reset(
new HashTablePool(estimated_num_entries,
@@ -362,11 +387,13 @@
if (aggregate_functions.empty()) {
return false;
}
- // Check if there's a distinct operation involved in any aggregate, if so
- // the aggregate can't be partitioned.
- for (auto distinct : is_distinct) {
- if (distinct) {
- return false;
+ // If there is only only aggregate function, we allow distinct aggregation.
+ // Otherwise it can't be partitioned with distinct aggregation.
+ if (aggregate_functions.size() > 1) {
+ for (auto distinct : is_distinct) {
+ if (distinct) {
+ return false;
+ }
}
}
// There's no distinct aggregation involved, Check if there's at least one
@@ -384,12 +411,17 @@
}
}
+ // Currently we always use partitioned aggregation to parallelize distinct
+ // aggregation.
+ if (all_distinct_) {
+ return true;
+ }
+
// There are GROUP BYs without DISTINCT. Check if the estimated number of
// groups is large enough to warrant a partitioned aggregation.
return estimated_num_groups >
static_cast<std::size_t>(
FLAGS_partition_aggregation_num_groups_threshold);
- return false;
}
std::size_t AggregationOperationState::getNumInitializationPartitions() const {
@@ -599,10 +631,19 @@
}
ValueAccessorMultiplexer local_mux(base_adapter.get(), derived_adapter.get());
- partitioned_group_by_hashtable_pool_->getHashTable(partition)
- ->upsertValueAccessorCompositeKey(argument_ids_,
- group_by_key_ids_,
- local_mux);
+ if (all_distinct_) {
+ DCHECK_EQ(1u, handles_.size());
+ handles_.front()->insertValueAccessorIntoDistinctifyHashTable(
+ argument_ids_.front(),
+ group_by_key_ids_,
+ local_mux,
+ partitioned_group_by_hashtable_pool_->getHashTable(partition));
+ } else {
+ partitioned_group_by_hashtable_pool_->getHashTable(partition)
+ ->upsertValueAccessorCompositeKey(argument_ids_,
+ group_by_key_ids_,
+ local_mux);
+ }
}
});
}
@@ -621,13 +662,15 @@
}
}
- AggregationStateHashTableBase *agg_hash_table =
- group_by_hashtable_pool_->getHashTable();
+ if (!all_distinct_) {
+ AggregationStateHashTableBase *agg_hash_table =
+ group_by_hashtable_pool_->getHashTable();
- agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_,
- group_by_key_ids_,
- accessor_mux);
- group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+ agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_,
+ group_by_key_ids_,
+ accessor_mux);
+ group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+ }
}
void AggregationOperationState::finalizeAggregate(
@@ -711,10 +754,24 @@
void AggregationOperationState::finalizeHashTableImplPartitioned(
const std::size_t partition_id,
InsertDestination *output_destination) {
- PackedPayloadHashTable *hash_table =
+ PackedPayloadHashTable *partitioned_hash_table =
static_cast<PackedPayloadHashTable *>(
partitioned_group_by_hashtable_pool_->getHashTable(partition_id));
+ PackedPayloadHashTable *hash_table;
+ if (all_distinct_) {
+ DCHECK_EQ(1u, handles_.size());
+ DCHECK(group_by_hashtable_pool_ != nullptr);
+
+ hash_table = static_cast<PackedPayloadHashTable *>(
+ group_by_hashtable_pool_->getHashTable());
+ handles_.front()->aggregateOnDistinctifyHashTableForGroupBy(
+ *partitioned_hash_table, 0, hash_table);
+ partitioned_hash_table->destroyPayload();
+ } else {
+ hash_table = partitioned_hash_table;
+ }
+
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
std::vector<std::vector<TypedValue>> group_by_keys;
@@ -790,19 +847,24 @@
// TODO(harshad) - Find heuristics for faster merge, even in a single thread.
// e.g. Keep merging entries from smaller hash tables to larger.
- auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
- DCHECK(hash_tables != nullptr);
- if (hash_tables->empty()) {
- return;
- }
+ std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr;
- std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr(
- hash_tables->back().release());
- for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
- std::unique_ptr<AggregationStateHashTableBase> hash_table(
- hash_tables->at(i).release());
- mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
- hash_table->destroyPayload();
+ if (all_distinct_) {
+ final_hash_table_ptr.reset(group_by_hashtable_pool_->getHashTable());
+ } else {
+ auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+ DCHECK(hash_tables != nullptr);
+ if (hash_tables->empty()) {
+ return;
+ }
+
+ final_hash_table_ptr.reset(hash_tables->back().release());
+ for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+ std::unique_ptr<AggregationStateHashTableBase> hash_table(
+ hash_tables->at(i).release());
+ mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
+ hash_table->destroyPayload();
+ }
}
PackedPayloadHashTable *final_hash_table =
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index c8930ee..6c9690a 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -273,6 +273,9 @@
// arguments.
std::vector<bool> is_distinct_;
+ // A flag indicating whether all aggregate functions are DISTINCT aggregations.
+ const bool all_distinct_;
+
// Non-trivial group-by/argument expressions that need to be evaluated.
std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 293be17..8b68150 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -817,7 +817,8 @@
quickstep_utility_Alignment
quickstep_utility_HashPair
quickstep_utility_Macros
- quickstep_utility_PrimeNumber)
+ quickstep_utility_PrimeNumber
+ quickstep_utility_TemplateUtil)
target_link_libraries(quickstep_storage_PartitionedHashTablePool
glog
quickstep_storage_HashTableBase
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
index bf5eaee..3d672f2 100644
--- a/storage/PackedPayloadHashTable.cpp
+++ b/storage/PackedPayloadHashTable.cpp
@@ -40,6 +40,7 @@
#include "utility/Alignment.hpp"
#include "utility/Macros.hpp"
#include "utility/PrimeNumber.hpp"
+#include "utility/TemplateUtil.hpp"
#include "glog/logging.h"
@@ -234,23 +235,31 @@
ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
+ const bool has_derived_accessor = (derived_accessor != nullptr);
+
base_accessor->beginIterationVirtual();
- if (derived_accessor == nullptr) {
- return upsertValueAccessorCompositeKeyInternal<false>(
- argument_ids,
- key_attr_ids,
- base_accessor,
- nullptr);
- } else {
+ if (has_derived_accessor) {
DCHECK(derived_accessor->getImplementationType()
== ValueAccessor::Implementation::kColumnVectors);
derived_accessor->beginIterationVirtual();
- return upsertValueAccessorCompositeKeyInternal<true>(
- argument_ids,
- key_attr_ids,
- base_accessor,
- static_cast<ColumnVectorsValueAccessor *>(derived_accessor));
}
+
+ return InvokeOnBools(
+ has_derived_accessor,
+ handles_.empty(),
+ !all_keys_inline_,
+ [&](auto use_two_accessors, // NOLINT(build/c++11)
+ auto key_only,
+ auto has_variable_size) -> bool {
+ return upsertValueAccessorCompositeKeyInternal<
+ decltype(use_two_accessors)::value,
+ decltype(key_only)::value,
+ decltype(has_variable_size)::value>(
+ argument_ids,
+ key_attr_ids,
+ base_accessor,
+ static_cast<ColumnVectorsValueAccessor *>(derived_accessor));
+ });
}
void PackedPayloadHashTable::resize(const std::size_t extra_buckets,
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index f87a1de..c49bdb4 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -20,10 +20,12 @@
#ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
#define QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
+#include <algorithm>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <cstring>
+#include <functional>
#include <limits>
#include <vector>
@@ -336,11 +338,12 @@
const std::uint8_t **value,
std::size_t *entry_num) const;
+ template <bool key_only = false>
inline std::uint8_t* upsertCompositeKeyInternal(
const std::vector<TypedValue> &key,
const std::size_t variable_key_size);
- template <bool use_two_accessors>
+ template <bool use_two_accessors, bool key_only, bool has_variable_size>
inline bool upsertValueAccessorCompositeKeyInternal(
const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
const std::vector<MultiSourceAttributeId> &key_ids,
@@ -355,8 +358,9 @@
// comes from a HashTableKeyManager, and is set by the constructor of a
// subclass of HashTable.
inline void setKeyInline(const std::vector<bool> *key_inline) {
- scalar_key_inline_ = key_inline->front();
key_inline_ = key_inline;
+ all_keys_inline_ = std::accumulate(key_inline_->begin(), key_inline_->end(),
+ true, std::logical_and<bool>());
}
inline static std::size_t ComputeTotalPayloadSize(
@@ -407,7 +411,7 @@
// Information about whether key components are stored inline or in a
// separate variable-length storage region. This is usually determined by a
// HashTableKeyManager and set by calling setKeyInline().
- bool scalar_key_inline_;
+ bool all_keys_inline_;
const std::vector<bool> *key_inline_;
const std::size_t num_handles_;
@@ -763,7 +767,7 @@
}
}
-
+template <bool key_only>
inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
const std::vector<TypedValue> &key,
const std::size_t variable_key_size) {
@@ -809,7 +813,9 @@
writeCompositeKeyToBucket(key, hash_code, bucket);
std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
- std::memcpy(value, init_payload_, this->total_payload_size_);
+ if (!key_only) {
+ std::memcpy(value, init_payload_, this->total_payload_size_);
+ }
// Update the previous chaing pointer to point to the new bucket.
pending_chain_ptr->store(pending_chain_ptr_finish_value,
@@ -819,13 +825,13 @@
return value;
}
-template <bool use_two_accessors>
+template <bool use_two_accessors, bool key_only, bool has_variable_size>
inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
const std::vector<MultiSourceAttributeId> &key_ids,
ValueAccessor *base_accessor,
ColumnVectorsValueAccessor *derived_accessor) {
- std::size_t variable_size;
+ std::size_t variable_size = 0;
std::vector<TypedValue> key_vector;
key_vector.resize(key_ids.size());
@@ -848,13 +854,17 @@
&key_vector)) {
continue;
}
- variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
- std::uint8_t *value = this->upsertCompositeKeyInternal(
- key_vector, variable_size);
+ if (has_variable_size) {
+ variable_size =
+ this->calculateVariableLengthCompositeKeyCopySize(key_vector);
+ }
+ std::uint8_t *value =
+ this->template upsertCompositeKeyInternal<key_only>(
+ key_vector, variable_size);
if (value == nullptr) {
continuing = true;
break;
- } else {
+ } else if (!key_only) {
SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
for (unsigned int k = 0; k < num_handles_; ++k) {
const auto &ids = argument_ids[k];
diff --git a/utility/TemplateUtil.hpp b/utility/TemplateUtil.hpp
index 33e4f42..dfae8e4 100644
--- a/utility/TemplateUtil.hpp
+++ b/utility/TemplateUtil.hpp
@@ -30,6 +30,8 @@
* @{
*/
+namespace template_util_inner {
+
/**
* @brief Represents a compile-time sequence of integers.
*
@@ -58,7 +60,6 @@
typedef Sequence<S...> type;
};
-
/**
* @brief Final step of CreateBoolInstantiatedInstance. Now all bool_values are
* ready. Instantiate the template and create (i.e. new) an instance.
@@ -72,6 +73,42 @@
}
/**
+ * @brief Invoke the functor with the compile-time bool values wrapped as
+ * integral_constant types.
+ */
+template <typename FunctorT, bool ...bool_values>
+inline auto InvokeOnBoolsInner(const FunctorT &functor) {
+ return functor(std::integral_constant<bool, bool_values>()...);
+}
+
+/**
+ * @brief Recursive dispatching.
+ */
+template <typename FunctorT, bool ...bool_values, typename ...Bools>
+inline auto InvokeOnBoolsInner(const FunctorT &functor,
+ const bool tparam,
+ const Bools ...rest_params) {
+ if (tparam) {
+ return InvokeOnBoolsInner<FunctorT, bool_values..., true>(
+ functor, rest_params...);
+ } else {
+ return InvokeOnBoolsInner<FunctorT, bool_values..., false>(
+ functor, rest_params...);
+ }
+}
+
+/**
+ * @brief Move the functor to the first position in argument list.
+ */
+template <std::size_t last, std::size_t ...i, typename TupleT>
+inline auto InvokeOnBoolsInner(TupleT &&args, Sequence<i...> &&indices) {
+ return InvokeOnBoolsInner(std::get<last>(std::forward<TupleT>(args)),
+ std::get<i>(std::forward<TupleT>(args))...);
+}
+
+} // namespace template_util_inner
+
+/**
* @brief Edge case of the recursive CreateBoolInstantiatedInstance function
* when all bool variables have been branched and replaced with compile-time
* bool constants.
@@ -85,8 +122,10 @@
// for the tuple, so that the tuple can be unpacked as a sequence of constructor
// parameters in CreateBoolInstantiatedInstanceInner.
constexpr std::size_t n_args = std::tuple_size<Tuple>::value;
- return CreateBoolInstantiatedInstanceInner<T, ReturnT, bool_values...>(
- std::forward<Tuple>(args), typename MakeSequence<n_args>::type());
+ return template_util_inner::CreateBoolInstantiatedInstanceInner<
+ T, ReturnT, bool_values...>(
+ std::forward<Tuple>(args),
+ typename template_util_inner::MakeSequence<n_args>::type());
}
/**
@@ -160,6 +199,35 @@
}
}
+/**
+ * @brief A helper function for bool branched template specialization.
+ *
+ * Usage example:
+ * --
+ * bool c1 = true, c2 = false;
+ *
+ * InvokeOnBools(
+ * c1, c2,
+ * [&](auto c1, auto c2) -> SomeBaseClass* {
+ * using T1 = decltype(c1); // T1 == std::true_type
+ * using T2 = decltype(c2); // T2 == std::false_type
+ *
+ * constexpr bool cv1 = T1::value; // cv1 == true
+ * constexpr bool cv2 = T2::value; // cv2 == false
+ *
+ * SomeFunction<cv1, cv2>(...);
+ * return new SomeClass<cv1, cv2>(...);
+ * });
+ * --
+ */
+template <typename ...ArgTypes>
+inline auto InvokeOnBools(ArgTypes ...args) {
+ constexpr std::size_t last = sizeof...(args) - 1;
+ return template_util_inner::InvokeOnBoolsInner<last>(
+ std::forward_as_tuple(args...),
+ typename template_util_inner::MakeSequence<last>::type());
+}
+
/** @} */
} // namespace quickstep