Determine #InitPartitions for CollisionFreeVectorTable in the optimizer.
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 69105e6..fdf8796 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -65,6 +65,7 @@
quickstep_expressions_Expressions_proto
quickstep_expressions_aggregation_AggregateFunction
quickstep_expressions_aggregation_AggregateFunction_proto
+ quickstep_expressions_aggregation_AggregationID
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
@@ -150,12 +151,14 @@
quickstep_storage_InsertDestination_proto
quickstep_storage_StorageBlockLayout
quickstep_storage_StorageBlockLayout_proto
+ quickstep_storage_StorageConstants
quickstep_storage_SubBlockTypeRegistry
quickstep_types_Type
quickstep_types_Type_proto
quickstep_types_TypedValue
quickstep_types_TypedValue_proto
quickstep_types_containers_Tuple_proto
+ quickstep_utility_BarrieredReadWriteConcurrentBitVector
quickstep_utility_Macros
quickstep_utility_SqlError)
if (ENABLE_DISTRIBUTED)
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 88dc505..9bfd136 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -20,6 +20,7 @@
#include "query_optimizer/ExecutionGenerator.hpp"
#include <algorithm>
+#include <atomic>
#include <cstddef>
#include <functional>
#include <memory>
@@ -52,6 +53,7 @@
#include "expressions/Expressions.pb.h"
#include "expressions/aggregation/AggregateFunction.hpp"
#include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/aggregation/AggregationID.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
@@ -133,19 +135,23 @@
#include "storage/InsertDestination.pb.h"
#include "storage/StorageBlockLayout.hpp"
#include "storage/StorageBlockLayout.pb.h"
+#include "storage/StorageConstants.hpp"
#include "storage/SubBlockTypeRegistry.hpp"
#include "types/Type.hpp"
#include "types/Type.pb.h"
#include "types/TypedValue.hpp"
#include "types/TypedValue.pb.h"
#include "types/containers/Tuple.pb.h"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
#include "utility/SqlError.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
+using std::atomic;
using std::make_unique;
using std::move;
+using std::pair;
using std::static_pointer_cast;
using std::unique_ptr;
using std::unordered_map;
@@ -191,6 +197,61 @@
namespace {
+size_t CacheLineAlignedBytes(const size_t actual_bytes) {
+ return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
+}
+
+size_t CalculateNumInitializationPartitionsForCollisionFreeVectorTable(const size_t memory_size) {
+ // At least 1 partition, at most (#workers * 2) partitions.
+ return std::max(1uL, std::min(memory_size / kCollisonFreeVectorInitBlobSize,
+ static_cast<size_t>(2 * FLAGS_num_workers)));
+}
+
+void CalculateCollisionFreeAggregationInfo(
+ const size_t num_entries, const vector<pair<AggregationID, vector<const Type *>>> &group_by_aggrs_info,
+ S::CollisionFreeVectorInfo *collision_free_vector_info) {
+ size_t memory_size = CacheLineAlignedBytes(
+ BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
+
+ for (std::size_t i = 0; i < group_by_aggrs_info.size(); ++i) {
+ const auto &group_by_aggr_info = group_by_aggrs_info[i];
+
+ size_t state_size = 0;
+ switch (group_by_aggr_info.first) {
+ case AggregationID::kCount: {
+ state_size = sizeof(atomic<size_t>);
+ break;
+ }
+ case AggregationID::kSum: {
+ const vector<const Type *> &argument_types = group_by_aggr_info.second;
+ DCHECK_EQ(1u, argument_types.size());
+ switch (argument_types.front()->getTypeID()) {
+ case TypeID::kInt:
+ case TypeID::kLong:
+ state_size = sizeof(atomic<std::int64_t>);
+ break;
+ case TypeID::kFloat:
+ case TypeID::kDouble:
+ state_size = sizeof(atomic<double>);
+ break;
+ default:
+ LOG(FATAL) << "No support by CollisionFreeVector";
+ }
+ break;
+ }
+ default:
+ LOG(FATAL) << "No support by CollisionFreeVector";
+ }
+
+ collision_free_vector_info->add_state_offsets(memory_size);
+ memory_size += CacheLineAlignedBytes(state_size * num_entries);
+ }
+
+ collision_free_vector_info->set_memory_size(memory_size);
+ collision_free_vector_info->set_num_init_partitions(
+ CalculateNumInitializationPartitionsForCollisionFreeVectorTable(memory_size));
+}
+
size_t CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(const size_t num_entries) {
// Set finalization segment size as 4096 entries.
constexpr size_t kFinalizeSegmentSize = 4uL * 1024L;
@@ -1737,6 +1798,7 @@
const auto &aggregate_expressions = physical_plan->aggregate_expressions();
vector<bool> is_distincts;
+ vector<pair<AggregationID, vector<const Type *>>> group_by_aggrs_info;
for (const E::AliasPtr &named_aggregate_expression : aggregate_expressions) {
const E::AggregateFunctionPtr unnamed_aggregate_expression =
std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
@@ -1745,15 +1807,22 @@
S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
// Set the AggregateFunction.
- aggr_proto->mutable_function()->MergeFrom(
- unnamed_aggregate_expression->getAggregate().getProto());
+ const AggregateFunction &aggr_func = unnamed_aggregate_expression->getAggregate();
+ aggr_proto->mutable_function()->MergeFrom(aggr_func.getProto());
// Add each of the aggregate's arguments.
+ vector<const Type *> argument_types;
for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+ argument_types.push_back(&concretized_argument->getType());
+
aggr_proto->add_argument()->MergeFrom(concretized_argument->getProto());
}
+ if (!group_by_types.empty()) {
+ group_by_aggrs_info.emplace_back(aggr_func.getAggregationID(), move(argument_types));
+ }
+
// Set whether it is a DISTINCT aggregation.
const bool is_distinct = unnamed_aggregate_expression->is_distinct();
aggr_proto->set_is_distinct(is_distinct);
@@ -1786,6 +1855,10 @@
aggr_state_proto->set_estimated_num_entries(max_num_groups);
use_parallel_initialization = true;
aggr_state_num_partitions = CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(max_num_groups);
+
+ DCHECK(!group_by_aggrs_info.empty());
+ CalculateCollisionFreeAggregationInfo(max_num_groups, group_by_aggrs_info,
+ aggr_state_proto->mutable_collision_free_vector_info());
} else {
if (cost_model_for_aggregation_->canUseTwoPhaseCompactKeyAggregation(
physical_plan, estimated_num_groups)) {
@@ -1831,12 +1904,15 @@
}
if (use_parallel_initialization) {
+ DCHECK(aggr_state_proto->has_collision_free_vector_info());
+
const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
execution_plan_->addRelationalOperator(
new InitializeAggregationOperator(
query_handle_->query_id(),
aggr_state_index,
- num_partitions));
+ num_partitions,
+ aggr_state_proto->collision_free_vector_info().num_init_partitions()));
execution_plan_->addDirectDependency(aggregation_operator_index,
initialize_aggregation_operator_index,
@@ -1939,6 +2015,7 @@
aggr_state_proto->mutable_predicate()->MergeFrom(predicate->getProto());
}
+ vector<pair<AggregationID, vector<const Type *>>> group_by_aggrs_info;
for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
const E::AggregateFunctionPtr unnamed_aggregate_expression =
std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
@@ -1947,26 +2024,35 @@
S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
// Set the AggregateFunction.
- aggr_proto->mutable_function()->MergeFrom(
- unnamed_aggregate_expression->getAggregate().getProto());
+ const AggregateFunction &aggr_func = unnamed_aggregate_expression->getAggregate();
+ aggr_proto->mutable_function()->MergeFrom(aggr_func.getProto());
// Add each of the aggregate's arguments.
+ vector<const Type *> argument_types;
for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+ argument_types.push_back(&concretized_argument->getType());
+
aggr_proto->add_argument()->MergeFrom(concretized_argument->getProto());
}
+ group_by_aggrs_info.emplace_back(aggr_func.getAggregationID(), move(argument_types));
+
// Set whether it is a DISTINCT aggregation.
DCHECK(!unnamed_aggregate_expression->is_distinct());
aggr_proto->set_is_distinct(false);
}
+ CalculateCollisionFreeAggregationInfo(estimated_num_entries, group_by_aggrs_info,
+ aggr_state_proto->mutable_collision_free_vector_info());
+
const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
execution_plan_->addRelationalOperator(
new InitializeAggregationOperator(
query_handle_->query_id(),
aggr_state_index,
- num_partitions));
+ num_partitions,
+ aggr_state_proto->collision_free_vector_info().num_init_partitions()));
const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index =
execution_plan_->addRelationalOperator(
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
index e197b08..f91299d 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -46,7 +46,7 @@
DCHECK(agg_state != nullptr);
for (std::size_t state_part_id = 0;
- state_part_id < agg_state->getNumInitializationPartitions();
+ state_part_id < aggr_state_num_init_partitions_;
++state_part_id) {
container->addNormalWorkOrder(
new InitializeAggregationWorkOrder(query_id_,
diff --git a/relational_operators/InitializeAggregationOperator.hpp b/relational_operators/InitializeAggregationOperator.hpp
index 0a9d25d..b7e9aae 100644
--- a/relational_operators/InitializeAggregationOperator.hpp
+++ b/relational_operators/InitializeAggregationOperator.hpp
@@ -58,13 +58,17 @@
* @param aggr_state_index The index of the AggregationOperationState in QueryContext.
* @param num_partitions The number of partitions in 'input_relation'. If no
* partitions, it is one.
+ * @param aggr_state_num_init_partitions The number of partitions to be used
+ * for initialize the aggregation state collision free vector table.
**/
InitializeAggregationOperator(const std::size_t query_id,
const QueryContext::aggregation_state_id aggr_state_index,
- const std::size_t num_partitions)
+ const std::size_t num_partitions,
+ const std::size_t aggr_state_num_init_partitions)
: RelationalOperator(query_id),
aggr_state_index_(aggr_state_index),
num_partitions_(num_partitions),
+ aggr_state_num_init_partitions_(aggr_state_num_init_partitions),
started_(false) {}
~InitializeAggregationOperator() override {}
@@ -87,7 +91,7 @@
private:
const QueryContext::aggregation_state_id aggr_state_index_;
- const std::size_t num_partitions_;
+ const std::size_t num_partitions_, aggr_state_num_init_partitions_;
bool started_;
DISALLOW_COPY_AND_ASSIGN(InitializeAggregationOperator);
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 3d1c14a..0f4795f 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -40,8 +40,9 @@
#include "expressions/scalar/Scalar.hpp"
#include "storage/AggregationOperationState.pb.h"
#include "storage/CollisionFreeVectorTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "storage/HashTable.pb.h"
#include "storage/HashTableBase.hpp"
+#include "storage/HashTableFactory.hpp"
#include "storage/InsertDestination.hpp"
#include "storage/PackedPayloadHashTable.hpp"
#include "storage/StorageBlock.hpp"
@@ -63,8 +64,13 @@
#include "glog/logging.h"
+using std::size_t;
+using std::vector;
+
namespace quickstep {
+namespace S = serialization;
+
AggregationOperationState::AggregationOperationState(
const CatalogRelationSchema &input_relation,
const std::vector<const AggregateFunction *> &aggregate_functions,
@@ -77,7 +83,10 @@
const std::size_t num_partitions,
const HashTableImplType hash_table_impl_type,
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ const size_t collision_free_vector_memory_size,
+ const size_t collision_free_vector_num_init_partitions,
+ const vector<size_t> &collision_free_vector_state_offsets)
: input_relation_(input_relation),
is_aggregate_collision_free_(
group_by.empty() ? false
@@ -208,7 +217,10 @@
estimated_num_entries,
group_by_handles,
storage_manager,
- num_partitions));
+ num_partitions,
+ collision_free_vector_memory_size,
+ collision_free_vector_num_init_partitions,
+ collision_free_vector_state_offsets));
} else if (is_aggregate_partitioned_) {
if (all_distinct_) {
DCHECK_EQ(1u, group_by_handles.size());
@@ -288,6 +300,19 @@
PredicateFactory::ReconstructFromProto(proto.predicate(), database));
}
+ size_t collision_free_vector_memory_size = 0;
+ size_t collision_free_vector_num_init_partitions = 0;
+ vector<size_t> collision_free_vector_state_offsets;
+ if (proto.has_collision_free_vector_info()) {
+ const serialization::CollisionFreeVectorInfo &collision_free_vector_info =
+ proto.collision_free_vector_info();
+ collision_free_vector_memory_size = collision_free_vector_info.memory_size();
+ collision_free_vector_num_init_partitions = collision_free_vector_info.num_init_partitions();
+ for (int i = 0; i < collision_free_vector_info.state_offsets_size(); ++i) {
+ collision_free_vector_state_offsets.push_back(collision_free_vector_info.state_offsets(i));
+ }
+ }
+
return new AggregationOperationState(
database.getRelationSchemaById(proto.relation_id()),
aggregate_functions,
@@ -300,7 +325,10 @@
proto.num_partitions(),
HashTableImplTypeFromProto(proto.hash_table_impl_type()),
distinctify_hash_table_impl_types,
- storage_manager);
+ storage_manager,
+ collision_free_vector_memory_size,
+ collision_free_vector_num_init_partitions,
+ collision_free_vector_state_offsets);
}
bool AggregationOperationState::ProtoIsValid(
@@ -345,18 +373,31 @@
}
}
- for (int i = 0; i < proto.group_by_expressions_size(); ++i) {
+ const int group_by_expressions_size = proto.group_by_expressions_size();
+ for (int i = 0; i < group_by_expressions_size; ++i) {
if (!ScalarFactory::ProtoIsValid(proto.group_by_expressions(i), database)) {
return false;
}
}
- if (proto.group_by_expressions_size() > 0) {
+ if (group_by_expressions_size > 0) {
if (!proto.has_hash_table_impl_type() ||
!serialization::HashTableImplType_IsValid(
proto.hash_table_impl_type())) {
return false;
}
+
+ if (proto.hash_table_impl_type() == S::HashTableImplType::COLLISION_FREE_VECTOR) {
+ if (!proto.has_collision_free_vector_info()) {
+ return false;
+ }
+
+ const S::CollisionFreeVectorInfo &proto_collision_free_vector_info = proto.collision_free_vector_info();
+ if (!proto_collision_free_vector_info.IsInitialized() ||
+ proto_collision_free_vector_info.state_offsets_size() != group_by_expressions_size) {
+ return false;
+ }
+ }
}
if (proto.has_predicate()) {
@@ -368,15 +409,6 @@
return true;
}
-std::size_t AggregationOperationState::getNumInitializationPartitions() const {
- if (is_aggregate_collision_free_) {
- return static_cast<CollisionFreeVectorTable *>(
- collision_free_hashtable_.get())->getNumInitializationPartitions();
- } else {
- return 0u;
- }
-}
-
CollisionFreeVectorTable* AggregationOperationState
::getCollisionFreeVectorTable() const {
return static_cast<CollisionFreeVectorTable *>(
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 6174478..c7680b0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -108,6 +108,12 @@
* @param storage_manager The StorageManager to use for allocating hash
* tables. Single aggregation state (when GROUP BY list is not
* specified) is not allocated using memory from storage manager.
+ * @param collision_free_vector_memory_size For CollisionFreeVectorTable,
+ * the memory size.
+ * @param collision_free_vector_num_init_partitions For
+ * CollisionFreeVectorTable, the number of partitions to initialize.
+ * @param collision_free_vector_state_offsets For CollisionFreeVectorTable,
+ * the offsets for each state.
*/
AggregationOperationState(
const CatalogRelationSchema &input_relation,
@@ -121,7 +127,10 @@
const std::size_t num_partitions,
const HashTableImplType hash_table_impl_type,
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
- StorageManager *storage_manager);
+ StorageManager *storage_manager,
+ const std::size_t collision_free_vector_memory_size = 0,
+ const std::size_t collision_free_vector_num_init_partitions = 0,
+ const std::vector<std::size_t> &collision_free_vector_state_offsets = std::vector<std::size_t>());
~AggregationOperationState() {}
diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto
index 1a8a302..d2305f1 100644
--- a/storage/AggregationOperationState.proto
+++ b/storage/AggregationOperationState.proto
@@ -45,4 +45,7 @@
optional bool is_partitioned = 8;
optional uint64 num_partitions = 9 [default = 1];
+
+ // Required if 'hash_table_impl_type' is 'COLLISION_FREE_VECTOR'.
+ optional CollisionFreeVectorInfo collision_free_vector_info = 10;
}
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 3d90bc5..6446a83 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -283,6 +283,7 @@
quickstep_storage_HashTableBase
quickstep_storage_HashTableFactory
quickstep_storage_HashTablePool
+ quickstep_storage_HashTable_proto
quickstep_storage_InsertDestination
quickstep_storage_PartitionedHashTablePool
quickstep_storage_PackedPayloadHashTable
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
index 679f77b..e803954 100644
--- a/storage/CollisionFreeVectorTable.cpp
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -43,68 +43,33 @@
CollisionFreeVectorTable::CollisionFreeVectorTable(
const Type *key_type,
const std::size_t num_entries,
+ const std::size_t memory_size,
+ const std::size_t num_init_partitions,
const std::size_t num_finalize_partitions,
+ const std::vector<std::size_t> &state_offsets,
const std::vector<AggregationHandle *> &handles,
StorageManager *storage_manager)
: key_type_(key_type),
num_entries_(num_entries),
num_handles_(handles.size()),
handles_(handles),
+ memory_size_(memory_size),
+ num_init_partitions_(num_init_partitions),
num_finalize_partitions_(num_finalize_partitions),
storage_manager_(storage_manager) {
DCHECK_GT(num_entries, 0u);
DCHECK_GT(num_finalize_partitions_, 0u);
-
- std::size_t required_memory = 0;
- const std::size_t existence_map_offset = 0;
- std::vector<std::size_t> state_offsets;
-
- required_memory += CacheLineAlignedBytes(
- BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
-
- for (std::size_t i = 0; i < num_handles_; ++i) {
- const AggregationHandle *handle = handles_[i];
- const std::vector<const Type *> argument_types = handle->getArgumentTypes();
-
- std::size_t state_size = 0;
- switch (handle->getAggregationID()) {
- case AggregationID::kCount: {
- state_size = sizeof(std::atomic<std::size_t>);
- break;
- }
- case AggregationID::kSum: {
- DCHECK_EQ(1u, argument_types.size());
- switch (argument_types.front()->getTypeID()) {
- case TypeID::kInt: // Fall through
- case TypeID::kLong:
- state_size = sizeof(std::atomic<std::int64_t>);
- break;
- case TypeID::kFloat: // Fall through
- case TypeID::kDouble:
- state_size = sizeof(std::atomic<double>);
- break;
- default:
- LOG(FATAL) << "Not implemented";
- }
- break;
- }
- default:
- LOG(FATAL) << "Not implemented";
- }
-
- state_offsets.emplace_back(required_memory);
- required_memory += CacheLineAlignedBytes(state_size * num_entries);
- }
+ DCHECK_EQ(num_handles_, state_offsets.size());
const std::size_t num_storage_slots =
- storage_manager_->SlotsNeededForBytes(required_memory);
+ storage_manager_->SlotsNeededForBytes(memory_size_);
const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
blob_ = storage_manager_->getBlobMutable(blob_id);
void *memory_start = blob_->getMemoryMutable();
existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
- reinterpret_cast<char *>(memory_start) + existence_map_offset,
+ reinterpret_cast<char *>(memory_start),
num_entries,
false /* initialize */));
@@ -113,9 +78,6 @@
vec_tables_.emplace_back(
reinterpret_cast<char *>(memory_start) + state_offsets.at(i));
}
-
- memory_size_ = required_memory;
- num_init_partitions_ = CalculateNumInitializationPartitions(memory_size_);
}
CollisionFreeVectorTable::~CollisionFreeVectorTable() {
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 5aeb5cf..8e1342b 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -58,8 +58,12 @@
*
* @param key_type The group-by key type.
* @param num_entries The estimated number of entries this table will hold.
+ * @param memory_size The memory size for this table.
+ * @param num_init_partitions The number of partitions to be used for
+ * initializing the aggregation.
* @param num_finalize_partitions The number of partitions to be used for
* finalizing the aggregation.
+ * @param state_offsets The offsets for each state in the table.
* @param handles The aggregation handles.
* @param storage_manager The StorageManager to use (a StorageBlob will be
* allocated to hold this table's contents).
@@ -67,7 +71,10 @@
CollisionFreeVectorTable(
const Type *key_type,
const std::size_t num_entries,
+ const std::size_t memory_size,
+ const std::size_t num_init_partitions,
const std::size_t num_finalize_partitions,
+ const std::vector<std::size_t> &state_offsets,
const std::vector<AggregationHandle *> &handles,
StorageManager *storage_manager);
@@ -181,21 +188,6 @@
}
private:
- inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
- return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
- }
-
- inline static std::size_t CalculateNumInitializationPartitions(
- const std::size_t memory_size) {
- // Set initialization memory block size as 4MB.
- constexpr std::size_t kInitBlockSize = 4uL * 1024u * 1024u;
-
- // At least 1 partition, at most 80 partitions.
- // TODO(jianqiao): set the upbound as (# of workers * 2) instead of the
- // hardcoded 80.
- return std::max(1uL, std::min(memory_size / kInitBlockSize, 80uL));
- }
-
inline std::size_t calculatePartitionLength() const {
const std::size_t partition_length =
(num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_;
@@ -333,14 +325,13 @@
std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_;
std::vector<void *> vec_tables_;
+ const std::size_t memory_size_;
+ const std::size_t num_init_partitions_;
const std::size_t num_finalize_partitions_;
StorageManager *storage_manager_;
MutableBlobReference blob_;
- std::size_t memory_size_;
- std::size_t num_init_partitions_;
-
DISALLOW_COPY_AND_ASSIGN(CollisionFreeVectorTable);
};
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index ed383df..d489b9f 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -29,6 +29,12 @@
THREAD_PRIVATE_COMPACT_KEY = 4;
}
+message CollisionFreeVectorInfo {
+ required uint64 memory_size = 1;
+ required uint64 num_init_partitions = 2;
+ repeated uint64 state_offsets = 3;
+}
+
// NOTE(chasseur): This proto describes the run-time parameters for a resizable
// HashTable. It does not describe any template parameters of the HashTable
// class, which are different in different contexts (e.g. join vs. grouping).
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index d367160..732920f 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -358,6 +358,12 @@
* hash table constructor.
* @param num_partitions The number of partitions of this aggregation state
* hash table.
+ * @param collision_free_vector_memory_size For CollisionFreeVectorTable,
+ * the memory size.
+ * @param collision_free_vector_num_init_partitions For
+ * CollisionFreeVectorTable, the number of partitions to initialize.
+ * @param collision_free_vector_state_offsets For CollisionFreeVectorTable,
+ * the offsets for each state.
* @return A new aggregation state hash table.
**/
static AggregationStateHashTableBase* CreateResizable(
@@ -366,12 +372,17 @@
const std::size_t num_entries,
const std::vector<AggregationHandle *> &handles,
StorageManager *storage_manager,
- const std::size_t num_partitions = 1u) {
+ const std::size_t num_partitions = 1u,
+ const std::size_t collision_free_vector_memory_size = 0,
+ const std::size_t collision_free_vector_num_init_partitions = 0,
+ const std::vector<std::size_t> &collision_free_vector_state_offsets = std::vector<std::size_t>()) {
switch (hash_table_type) {
case HashTableImplType::kCollisionFreeVector:
DCHECK_EQ(1u, key_types.size());
return new CollisionFreeVectorTable(
- key_types.front(), num_entries, num_partitions, handles, storage_manager);
+ key_types.front(), num_entries, collision_free_vector_memory_size,
+ collision_free_vector_num_init_partitions, num_partitions,
+ collision_free_vector_state_offsets, handles, storage_manager);
case HashTableImplType::kSeparateChaining:
return new PackedPayloadHashTable(
key_types, num_entries, handles, storage_manager);
diff --git a/storage/StorageConstants.hpp b/storage/StorageConstants.hpp
index 037a8a9..c4debe0 100644
--- a/storage/StorageConstants.hpp
+++ b/storage/StorageConstants.hpp
@@ -122,6 +122,9 @@
// to indicate zero-sized blocks or sub-blocks.
const std::size_t kZeroSize = 0;
+// Set initialization memory blob size for CollisonFreeVector as 4MB.
+constexpr std::size_t kCollisonFreeVectorInitBlobSize = 4uL * 1024u * 1024u;
+
/** @} */
} // namespace quickstep