blob: 392e0f6003a321996f5693117a0a88e97ed1f66c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "storage/AggregationOperationState.hpp"
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "catalog/CatalogDatabaseLite.hpp"
#include "catalog/CatalogRelationSchema.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/ExpressionFactories.hpp"
#include "expressions/Expressions.pb.h"
#include "expressions/aggregation/AggregateFunction.hpp"
#include "expressions/aggregation/AggregateFunctionFactory.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "storage/AggregationOperationState.pb.h"
#include "storage/CollisionFreeVectorTable.hpp"
#include "storage/HashTableFactory.hpp"
#include "storage/HashTableBase.hpp"
#include "storage/InsertDestination.hpp"
#include "storage/PackedPayloadHashTable.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "storage/SubBlocksReference.hpp"
#include "storage/TupleIdSequence.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorMultiplexer.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
namespace quickstep {
DEFINE_int32(num_aggregation_partitions,
41,
"The number of partitions used for performing the aggregation");
DEFINE_int32(partition_aggregation_num_groups_threshold,
100,
"The threshold used for deciding whether the aggregation is done "
"in a partitioned way or not");
AggregationOperationState::AggregationOperationState(
const CatalogRelationSchema &input_relation,
const std::vector<const AggregateFunction *> &aggregate_functions,
std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
std::vector<bool> &&is_distinct,
std::vector<std::unique_ptr<const Scalar>> &&group_by,
const Predicate *predicate,
const std::size_t estimated_num_entries,
const HashTableImplType hash_table_impl_type,
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
StorageManager *storage_manager)
: input_relation_(input_relation),
is_aggregate_collision_free_(false),
is_aggregate_partitioned_(false),
predicate_(predicate),
is_distinct_(std::move(is_distinct)),
storage_manager_(storage_manager) {
if (!group_by.empty()) {
if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
is_aggregate_collision_free_ = true;
} else {
is_aggregate_partitioned_ = checkAggregatePartitioned(
estimated_num_entries, is_distinct_, group_by, aggregate_functions);
}
}
// Sanity checks: each aggregate has a corresponding list of arguments.
DCHECK(aggregate_functions.size() == arguments.size());
// Get the types of GROUP BY expressions for creating HashTables below.
for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
group_by_types_.emplace_back(&group_by_element->getType());
}
// Prepare group-by key ids and non-trivial expressions.
for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
const attribute_id attr_id =
group_by_element->getAttributeIdForValueAccessor();
if (attr_id != kInvalidAttributeID) {
group_by_key_ids_.emplace_back(ValueAccessorSource::kBase, attr_id);
} else {
group_by_key_ids_.emplace_back(ValueAccessorSource::kDerived,
non_trivial_expressions_.size());
non_trivial_expressions_.emplace_back(group_by_element.release());
}
}
std::vector<AggregationHandle *> group_by_handles;
// Set up each individual aggregate in this operation.
std::vector<const AggregateFunction *>::const_iterator agg_func_it =
aggregate_functions.begin();
std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
args_it = arguments.begin();
std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
std::vector<HashTableImplType>::const_iterator
distinctify_hash_table_impl_types_it =
distinctify_hash_table_impl_types.begin();
for (; agg_func_it != aggregate_functions.end();
++agg_func_it, ++args_it, ++is_distinct_it) {
// Get the Types of this aggregate's arguments so that we can create an
// AggregationHandle.
std::vector<const Type *> argument_types;
for (const std::unique_ptr<const Scalar> &argument : *args_it) {
argument_types.emplace_back(&argument->getType());
}
// Prepare argument attribute ids and non-trivial expressions.
std::vector<MultiSourceAttributeId> argument_ids;
for (std::unique_ptr<const Scalar> &argument : *args_it) {
const attribute_id attr_id =
argument->getAttributeIdForValueAccessor();
if (attr_id != kInvalidAttributeID) {
argument_ids.emplace_back(ValueAccessorSource::kBase, attr_id);
} else {
argument_ids.emplace_back(ValueAccessorSource::kDerived,
non_trivial_expressions_.size());
non_trivial_expressions_.emplace_back(argument.release());
}
}
argument_ids_.emplace_back(std::move(argument_ids));
// Sanity checks: aggregate function exists and can apply to the specified
// arguments.
DCHECK(*agg_func_it != nullptr);
DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
// Have the AggregateFunction create an AggregationHandle that we can use
// to do actual aggregate computation.
handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
if (!group_by_key_ids_.empty()) {
// Aggregation with GROUP BY: combined payload is partially updated in
// the presence of DISTINCT.
if (*is_distinct_it) {
handles_.back()->blockUpdate();
}
group_by_handles.emplace_back(handles_.back().get());
} else {
// Aggregation without GROUP BY: create a single global state.
single_states_.emplace_back(handles_.back()->createInitialState());
}
// Initialize the corresponding distinctify hash table if this is a
// DISTINCT aggregation.
if (*is_distinct_it) {
std::vector<const Type *> key_types(group_by_types_);
key_types.insert(
key_types.end(), argument_types.begin(), argument_types.end());
// TODO(jianqiao): estimated_num_entries is quite inaccurate for
// estimating the number of entries in the distinctify hash table.
// We need to estimate for each distinct aggregation an
// estimated_num_distinct_keys value during query optimization.
distinctify_hashtables_.emplace_back(
AggregationStateHashTableFactory::CreateResizable(
*distinctify_hash_table_impl_types_it,
key_types,
estimated_num_entries,
{} /* handles */,
storage_manager));
++distinctify_hash_table_impl_types_it;
} else {
distinctify_hashtables_.emplace_back(nullptr);
}
}
if (!group_by_key_ids_.empty()) {
// Aggregation with GROUP BY: create the hash table (pool).
if (is_aggregate_collision_free_) {
collision_free_hashtable_.reset(
AggregationStateHashTableFactory::CreateResizable(
hash_table_impl_type,
group_by_types_,
estimated_num_entries,
group_by_handles,
storage_manager));
} else if (is_aggregate_partitioned_) {
partitioned_hashtable_.reset(
AggregationStateHashTableFactory::CreateResizable(
hash_table_impl_type,
group_by_types_,
estimated_num_entries * 2,
group_by_handles,
storage_manager_));
} else {
group_by_hashtable_pool_.reset(
new HashTablePool(estimated_num_entries,
hash_table_impl_type,
group_by_types_,
group_by_handles,
storage_manager));
}
}
}
AggregationOperationState* AggregationOperationState::ReconstructFromProto(
const serialization::AggregationOperationState &proto,
const CatalogDatabaseLite &database,
StorageManager *storage_manager) {
DCHECK(ProtoIsValid(proto, database));
// Rebuild contructor arguments from their representation in 'proto'.
std::vector<const AggregateFunction *> aggregate_functions;
std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments;
std::vector<bool> is_distinct;
std::vector<HashTableImplType> distinctify_hash_table_impl_types;
std::size_t distinctify_hash_table_impl_type_index = 0;
for (int agg_idx = 0; agg_idx < proto.aggregates_size(); ++agg_idx) {
const serialization::Aggregate &agg_proto = proto.aggregates(agg_idx);
aggregate_functions.emplace_back(
&AggregateFunctionFactory::ReconstructFromProto(agg_proto.function()));
arguments.emplace_back();
arguments.back().reserve(agg_proto.argument_size());
for (int argument_idx = 0; argument_idx < agg_proto.argument_size();
++argument_idx) {
arguments.back().emplace_back(ScalarFactory::ReconstructFromProto(
agg_proto.argument(argument_idx), database));
}
is_distinct.emplace_back(agg_proto.is_distinct());
if (agg_proto.is_distinct()) {
distinctify_hash_table_impl_types.emplace_back(
HashTableImplTypeFromProto(proto.distinctify_hash_table_impl_types(
distinctify_hash_table_impl_type_index)));
++distinctify_hash_table_impl_type_index;
}
}
std::vector<std::unique_ptr<const Scalar>> group_by_expressions;
for (int group_by_idx = 0; group_by_idx < proto.group_by_expressions_size();
++group_by_idx) {
group_by_expressions.emplace_back(ScalarFactory::ReconstructFromProto(
proto.group_by_expressions(group_by_idx), database));
}
std::unique_ptr<Predicate> predicate;
if (proto.has_predicate()) {
predicate.reset(
PredicateFactory::ReconstructFromProto(proto.predicate(), database));
}
return new AggregationOperationState(
database.getRelationSchemaById(proto.relation_id()),
aggregate_functions,
std::move(arguments),
std::move(is_distinct),
std::move(group_by_expressions),
predicate.release(),
proto.estimated_num_entries(),
HashTableImplTypeFromProto(proto.hash_table_impl_type()),
distinctify_hash_table_impl_types,
storage_manager);
}
bool AggregationOperationState::ProtoIsValid(
const serialization::AggregationOperationState &proto,
const CatalogDatabaseLite &database) {
if (!proto.IsInitialized() ||
!database.hasRelationWithId(proto.relation_id()) ||
(proto.aggregates_size() < 0)) {
return false;
}
std::size_t num_distinctify_hash_tables =
proto.distinctify_hash_table_impl_types_size();
std::size_t distinctify_hash_table_impl_type_index = 0;
for (int i = 0; i < proto.aggregates_size(); ++i) {
if (!AggregateFunctionFactory::ProtoIsValid(
proto.aggregates(i).function())) {
return false;
}
// TODO(chasseur): We may also want to check that the specified
// AggregateFunction is applicable to the specified arguments, but that
// requires partial deserialization and may be too heavyweight for this
// method.
for (int argument_idx = 0;
argument_idx < proto.aggregates(i).argument_size();
++argument_idx) {
if (!ScalarFactory::ProtoIsValid(
proto.aggregates(i).argument(argument_idx), database)) {
return false;
}
}
if (proto.aggregates(i).is_distinct()) {
if (distinctify_hash_table_impl_type_index >=
num_distinctify_hash_tables ||
!serialization::HashTableImplType_IsValid(
proto.distinctify_hash_table_impl_types(
distinctify_hash_table_impl_type_index))) {
return false;
}
}
}
for (int i = 0; i < proto.group_by_expressions_size(); ++i) {
if (!ScalarFactory::ProtoIsValid(proto.group_by_expressions(i), database)) {
return false;
}
}
if (proto.group_by_expressions_size() > 0) {
if (!proto.has_hash_table_impl_type() ||
!serialization::HashTableImplType_IsValid(
proto.hash_table_impl_type())) {
return false;
}
}
if (proto.has_predicate()) {
if (!PredicateFactory::ProtoIsValid(proto.predicate(), database)) {
return false;
}
}
return true;
}
bool AggregationOperationState::checkAggregatePartitioned(
const std::size_t estimated_num_groups,
const std::vector<bool> &is_distinct,
const std::vector<std::unique_ptr<const Scalar>> &group_by,
const std::vector<const AggregateFunction *> &aggregate_functions) const {
// If there's no aggregation, return false.
if (aggregate_functions.empty()) {
return false;
}
// Check if there's a distinct operation involved in any aggregate, if so
// the aggregate can't be partitioned.
for (auto distinct : is_distinct) {
if (distinct) {
return false;
}
}
// There's no distinct aggregation involved, Check if there's at least one
// GROUP BY operation.
if (group_by.empty()) {
return false;
}
// Currently we require that all the group-by keys are ScalarAttributes for
// the convenient of implementing copy elision.
// TODO(jianqiao): relax this requirement.
for (const auto &group_by_element : group_by) {
if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
return false;
}
}
// // There are GROUP BYs without DISTINCT. Check if the estimated number of
// // groups is large enough to warrant a partitioned aggregation.
// return estimated_num_groups >
// static_cast<std::size_t>(
// FLAGS_partition_aggregation_num_groups_threshold);
return true;
}
std::size_t AggregationOperationState::getNumInitializationPartitions() const {
if (is_aggregate_collision_free_) {
return static_cast<CollisionFreeVectorTable *>(
collision_free_hashtable_.get())->getNumInitializationPartitions();
} else {
return 0u;
}
}
std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
if (is_aggregate_collision_free_) {
return static_cast<CollisionFreeVectorTable *>(
collision_free_hashtable_.get())->getNumFinalizationPartitions();
} else if (is_aggregate_partitioned_) {
return static_cast<PackedPayloadHashTable *>(
partitioned_hashtable_.get())->getNumFinalizationPartitions();
} else {
return 1u;
}
}
CollisionFreeVectorTable* AggregationOperationState
::getCollisionFreeVectorTable() const {
return static_cast<CollisionFreeVectorTable *>(
collision_free_hashtable_.get());
}
void AggregationOperationState::initialize(const std::size_t partition_id) {
if (is_aggregate_collision_free_) {
static_cast<CollisionFreeVectorTable *>(
collision_free_hashtable_.get())->initialize(partition_id);
} else {
LOG(FATAL) << "AggregationOperationState::initialize() "
<< "is not supported by this aggregation";
}
}
void AggregationOperationState::aggregateBlock(const block_id input_block,
LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
BlockReference block(
storage_manager_->getBlock(input_block, input_relation_));
const auto &tuple_store = block->getTupleStorageSubBlock();
std::unique_ptr<ValueAccessor> base_accessor(tuple_store.createValueAccessor());
std::unique_ptr<ValueAccessor> shared_accessor;
ValueAccessor *accessor = base_accessor.get();
// Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
// as the existence map for the tuples.
std::unique_ptr<TupleIdSequence> matches;
if (predicate_ != nullptr) {
matches.reset(block->getMatchesForPredicate(predicate_.get()));
shared_accessor.reset(
base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
accessor = shared_accessor.get();
}
if (lip_filter_adaptive_prober != nullptr) {
matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor));
shared_accessor.reset(
base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
accessor = shared_accessor.get();
}
std::unique_ptr<ColumnVectorsValueAccessor> non_trivial_results;
if (!non_trivial_expressions_.empty()) {
non_trivial_results.reset(new ColumnVectorsValueAccessor());
SubBlocksReference sub_blocks_ref(tuple_store,
block->getIndices(),
block->getIndicesConsistent());
for (const auto &expression : non_trivial_expressions_) {
non_trivial_results->addColumn(
expression->getAllValues(accessor, &sub_blocks_ref));
}
}
accessor->beginIterationVirtual();
ValueAccessorMultiplexer accessor_mux(accessor, non_trivial_results.get());
if (group_by_key_ids_.empty()) {
aggregateBlockSingleState(accessor_mux);
} else {
aggregateBlockHashTable(accessor_mux);
}
}
void AggregationOperationState::aggregateBlockSingleState(
const ValueAccessorMultiplexer &accessor_mux) {
// Aggregate per-block state for each aggregate.
std::vector<std::unique_ptr<AggregationState>> local_state;
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
const auto &argument_ids = argument_ids_[agg_idx];
const auto &handle = handles_[agg_idx];
AggregationState *state = nullptr;
if (is_distinct_[agg_idx]) {
handle->insertValueAccessorIntoDistinctifyHashTable(
argument_ids,
{},
accessor_mux,
distinctify_hashtables_[agg_idx].get());
} else {
if (argument_ids.empty()) {
// Special case. This is a nullary aggregate (i.e. COUNT(*)).
ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
DCHECK(base_accessor != nullptr);
state = handle->accumulateNullary(base_accessor->getNumTuplesVirtual());
} else {
// Have the AggregationHandle actually do the aggregation.
state = handle->accumulateValueAccessor(argument_ids, accessor_mux);
}
}
local_state.emplace_back(state);
}
// Merge per-block aggregation states back with global state.
mergeSingleState(local_state);
}
void AggregationOperationState::mergeSingleState(
const std::vector<std::unique_ptr<AggregationState>> &local_state) {
DCHECK_EQ(local_state.size(), single_states_.size());
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (!is_distinct_[agg_idx]) {
handles_[agg_idx]->mergeStates(*local_state[agg_idx],
single_states_[agg_idx].get());
}
}
}
void AggregationOperationState::mergeGroupByHashTables(
AggregationStateHashTableBase *src,
AggregationStateHashTableBase *dst) const {
HashTableMerger merger(static_cast<PackedPayloadHashTable *>(dst));
static_cast<PackedPayloadHashTable *>(src)->forEachCompositeKey(&merger);
}
void AggregationOperationState::aggregateBlockHashTable(
const ValueAccessorMultiplexer &accessor_mux) {
if (is_aggregate_collision_free_) {
aggregateBlockHashTableImplCollisionFree(accessor_mux);
} else if (is_aggregate_partitioned_) {
aggregateBlockHashTableImplPartitioned(accessor_mux);
} else {
aggregateBlockHashTableImplThreadPrivate(accessor_mux);
}
}
void AggregationOperationState::aggregateBlockHashTableImplCollisionFree(
const ValueAccessorMultiplexer &accessor_mux) {
DCHECK(collision_free_hashtable_ != nullptr);
collision_free_hashtable_->upsertValueAccessorCompositeKey(argument_ids_,
group_by_key_ids_,
accessor_mux);
}
void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
const ValueAccessorMultiplexer &accessor_mux) {
DCHECK(partitioned_hashtable_ != nullptr);
partitioned_hashtable_->upsertValueAccessorCompositeKey(argument_ids_,
group_by_key_ids_,
accessor_mux);
}
void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
const ValueAccessorMultiplexer &accessor_mux) {
DCHECK(group_by_hashtable_pool_ != nullptr);
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
handles_[agg_idx]->insertValueAccessorIntoDistinctifyHashTable(
argument_ids_[agg_idx],
group_by_key_ids_,
accessor_mux,
distinctify_hashtables_[agg_idx].get());
}
}
AggregationStateHashTableBase *agg_hash_table =
group_by_hashtable_pool_->getHashTable();
agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_,
group_by_key_ids_,
accessor_mux);
group_by_hashtable_pool_->returnHashTable(agg_hash_table);
}
void AggregationOperationState::finalizeAggregate(
const std::size_t partition_id,
InsertDestination *output_destination) {
if (group_by_key_ids_.empty()) {
DCHECK_EQ(0u, partition_id);
finalizeSingleState(output_destination);
} else {
finalizeHashTable(partition_id, output_destination);
}
}
void AggregationOperationState::finalizeSingleState(
InsertDestination *output_destination) {
// Simply build up a Tuple from the finalized values for each aggregate and
// insert it in '*output_destination'.
std::vector<TypedValue> attribute_values;
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
single_states_[agg_idx].reset(
handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(
*distinctify_hashtables_[agg_idx]));
}
attribute_values.emplace_back(
handles_[agg_idx]->finalize(*single_states_[agg_idx]));
}
output_destination->insertTuple(Tuple(std::move(attribute_values)));
}
void AggregationOperationState::finalizeHashTable(
const std::size_t partition_id,
InsertDestination *output_destination) {
if (is_aggregate_collision_free_) {
finalizeHashTableImplCollisionFree(partition_id, output_destination);
} else if (is_aggregate_partitioned_) {
finalizeHashTableImplPartitioned(partition_id, output_destination);
} else {
DCHECK_EQ(0u, partition_id);
finalizeHashTableImplThreadPrivate(output_destination);
}
}
void AggregationOperationState::finalizeHashTableImplCollisionFree(
const std::size_t partition_id,
InsertDestination *output_destination) {
std::vector<std::unique_ptr<ColumnVector>> final_values;
CollisionFreeVectorTable *hash_table =
static_cast<CollisionFreeVectorTable *>(collision_free_hashtable_.get());
const std::size_t max_length =
hash_table->getNumTuplesInFinalizationPartition(partition_id);
ColumnVectorsValueAccessor complete_result;
DCHECK_EQ(1u, group_by_types_.size());
const Type *key_type = group_by_types_.front();
DCHECK(NativeColumnVector::UsableForType(*key_type));
std::unique_ptr<NativeColumnVector> key_cv(
std::make_unique<NativeColumnVector>(*key_type, max_length));
hash_table->finalizeKey(partition_id, key_cv.get());
complete_result.addColumn(key_cv.release());
for (std::size_t i = 0; i < handles_.size(); ++i) {
const Type *result_type = handles_[i]->getResultType();
DCHECK(NativeColumnVector::UsableForType(*result_type));
std::unique_ptr<NativeColumnVector> result_cv(
std::make_unique<NativeColumnVector>(*result_type, max_length));
hash_table->finalizeState(partition_id, i, result_cv.get());
complete_result.addColumn(result_cv.release());
}
// Bulk-insert the complete result.
output_destination->bulkInsertTuples(&complete_result);
}
void AggregationOperationState::finalizeHashTableImplPartitioned(
const std::size_t partition_id,
InsertDestination *output_destination) {
PackedPayloadHashTable *hash_table =
static_cast<PackedPayloadHashTable *>(partitioned_hashtable_.get());
// std::cout << hash_table->numEntries() << "\n";
ColumnVectorsValueAccessor complete_result;
hash_table->finalize(partition_id, &complete_result);
// Bulk-insert the complete result.
output_destination->bulkInsertTuples(&complete_result);
}
void AggregationOperationState::finalizeHashTableImplThreadPrivate(
InsertDestination *output_destination) {
// TODO(harshad) - The merge phase may be slower when each hash table contains
// large number of entries. We should find ways in which we can perform a
// parallel merge.
// TODO(harshad) - Find heuristics for faster merge, even in a single thread.
// e.g. Keep merging entries from smaller hash tables to larger.
auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
DCHECK(hash_tables != nullptr);
if (hash_tables->empty()) {
return;
}
std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr(
hash_tables->back().release());
for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
std::unique_ptr<AggregationStateHashTableBase> hash_table(
hash_tables->at(i).release());
mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
hash_table->destroyPayload();
}
PackedPayloadHashTable *final_hash_table =
static_cast<PackedPayloadHashTable *>(final_hash_table_ptr.get());
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
std::vector<std::vector<TypedValue>> group_by_keys;
if (handles_.empty()) {
const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
const std::uint8_t *dumb_placeholder) -> void {
group_by_keys.emplace_back(std::move(group_by_key));
};
final_hash_table->forEachCompositeKey(&keys_retriever);
}
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
handles_[agg_idx]->allowUpdate();
handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
*distinctify_hashtables_[agg_idx], agg_idx, final_hash_table);
}
ColumnVector *agg_result_col =
handles_[agg_idx]->finalizeHashTable(
*final_hash_table, agg_idx, &group_by_keys);
DCHECK(agg_result_col != nullptr);
final_values.emplace_back(agg_result_col);
}
final_hash_table->destroyPayload();
// Reorganize 'group_by_keys' in column-major order so that we can make a
// ColumnVectorsValueAccessor to bulk-insert results.
//
// TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal
// if there is only one aggregate. The need to do this should hopefully go
// away when we work out storing composite structures for multiple aggregates
// in a single HashTable.
std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
std::size_t group_by_element_idx = 0;
for (const Type *group_by_type : group_by_types_) {
if (NativeColumnVector::UsableForType(*group_by_type)) {
NativeColumnVector *element_cv =
new NativeColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
element_cv->appendTypedValue(
std::move(group_key[group_by_element_idx]));
}
} else {
IndirectColumnVector *element_cv =
new IndirectColumnVector(*group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
element_cv->appendTypedValue(
std::move(group_key[group_by_element_idx]));
}
}
++group_by_element_idx;
}
// Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys
// and the finalized aggregates.
ColumnVectorsValueAccessor complete_result;
for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) {
complete_result.addColumn(group_by_cv.release());
}
for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) {
complete_result.addColumn(final_value_cv.release());
}
// Bulk-insert the complete result.
output_destination->bulkInsertTuples(&complete_result);
}
} // namespace quickstep