Updates
diff --git a/expressions/aggregation/AggFunc.hpp b/expressions/aggregation/AggFunc.hpp
index 31f385e..98b19ba 100644
--- a/expressions/aggregation/AggFunc.hpp
+++ b/expressions/aggregation/AggFunc.hpp
@@ -69,6 +69,11 @@
typename AggState<ArgType>::AtomicT> {};
template <typename ArgType>
+ inline static void InitAtomic(typename AggState<ArgType>::AtomicT *state) {
+ state->store(0, std::memory_order_relaxed);
+ }
+
+ template <typename ArgType>
inline static void MergeArgAtomic(const typename ArgType::cpptype &value,
typename AggState<ArgType>::AtomicT *state) {
LOG(FATAL) << "Not implemented";
@@ -81,6 +86,11 @@
}
template <typename ArgType>
+ inline static void InitUnsafe(typename AggState<ArgType>::T *state) {
+ *state = 0;
+ }
+
+ template <typename ArgType>
inline static void MergeArgUnsafe(const typename ArgType::cpptype &value,
typename AggState<ArgType>::T *state) {
*state += value;
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 00bb433..392e0f6 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -212,7 +212,7 @@
AggregationStateHashTableFactory::CreateResizable(
hash_table_impl_type,
group_by_types_,
- estimated_num_entries,
+ estimated_num_entries * 2,
group_by_handles,
storage_manager_));
} else {
@@ -384,12 +384,12 @@
}
}
- // There are GROUP BYs without DISTINCT. Check if the estimated number of
- // groups is large enough to warrant a partitioned aggregation.
- return estimated_num_groups >
- static_cast<std::size_t>(
- FLAGS_partition_aggregation_num_groups_threshold);
- return false;
+// // There are GROUP BYs without DISTINCT. Check if the estimated number of
+// // groups is large enough to warrant a partitioned aggregation.
+// return estimated_num_groups >
+// static_cast<std::size_t>(
+// FLAGS_partition_aggregation_num_groups_threshold);
+ return true;
}
std::size_t AggregationOperationState::getNumInitializationPartitions() const {
@@ -663,60 +663,10 @@
InsertDestination *output_destination) {
PackedPayloadHashTable *hash_table =
static_cast<PackedPayloadHashTable *>(partitioned_hashtable_.get());
+// std::cout << hash_table->numEntries() << "\n";
- // Each element of 'group_by_keys' is a vector of values for a particular
- // group (which is also the prefix of the finalized Tuple for that group).
- std::vector<std::vector<TypedValue>> group_by_keys;
-
- if (handles_.empty()) {
- hash_table->forEachCompositeKeyInPartition(
- partition_id,
- [&](std::vector<TypedValue> &group_by_key) -> void {
- group_by_keys.emplace_back(std::move(group_by_key));
- });
- }
-
- // Collect per-aggregate finalized values.
- std::vector<std::unique_ptr<ColumnVector>> final_values;
- for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
- ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
- *hash_table, agg_idx, &group_by_keys);
- if (agg_result_col != nullptr) {
- final_values.emplace_back(agg_result_col);
- }
- }
-// hash_table->destroyPayload();
-
- std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
- std::size_t group_by_element_idx = 0;
- for (const Type *group_by_type : group_by_types_) {
- if (NativeColumnVector::UsableForType(*group_by_type)) {
- NativeColumnVector *element_cv =
- new NativeColumnVector(*group_by_type, group_by_keys.size());
- group_by_cvs.emplace_back(element_cv);
- for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
- }
- } else {
- IndirectColumnVector *element_cv =
- new IndirectColumnVector(*group_by_type, group_by_keys.size());
- group_by_cvs.emplace_back(element_cv);
- for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
- }
- }
- ++group_by_element_idx;
- }
-
- // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys
- // and the finalized aggregates.
ColumnVectorsValueAccessor complete_result;
- for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) {
- complete_result.addColumn(group_by_cv.release());
- }
- for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) {
- complete_result.addColumn(final_value_cv.release());
- }
+ hash_table->finalize(partition_id, &complete_result);
// Bulk-insert the complete result.
output_destination->bulkInsertTuples(&complete_result);
diff --git a/storage/AggregationUtil.hpp b/storage/AggregationUtil.hpp
new file mode 100644
index 0000000..74a9095
--- /dev/null
+++ b/storage/AggregationUtil.hpp
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_AGGREGATION_UTIL_HPP_
+#define QUICKSTEP_STORAGE_AGGREGATION_UTIL_HPP_
+
+#include <type_traits>
+
+#include "expressions/aggregation/AggregationID.hpp"
+#include "expressions/aggregation/AggFunc.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/TypeID.hpp"
+#include "types/Type.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ * @{
+ */
+
+template <typename T>
+using remove_const_reference_t = std::remove_const_t<std::remove_reference_t<T>>;
+
+template <typename FunctorT>
+inline auto InvokeOnKeyType(const Type &type,
+ const FunctorT &functor) {
+ switch (type.getTypeID()) {
+ case TypeID::kInt:
+ return functor(static_cast<const IntType&>(type));
+ case TypeID::kLong:
+ return functor(static_cast<const LongType&>(type));
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnType(const Type &type,
+ const FunctorT &functor) {
+ switch (type.getTypeID()) {
+ case TypeID::kInt:
+ return functor(static_cast<const IntType&>(type));
+ case TypeID::kLong:
+ return functor(static_cast<const LongType&>(type));
+ case TypeID::kFloat:
+ return functor(static_cast<const FloatType&>(type));
+ case TypeID::kDouble:
+ return functor(static_cast<const DoubleType&>(type));
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnBool(const bool &val,
+ const FunctorT &functor) {
+ if (val) {
+ return functor(std::true_type());
+ } else {
+ return functor(std::false_type());
+ }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnBools(const bool &val1,
+ const bool &val2,
+ const FunctorT &functor) {
+ if (val1) {
+ if (val2) {
+ return functor(std::true_type(), std::true_type());
+ } else {
+ return functor(std::true_type(), std::false_type());
+ }
+ } else {
+ if (val2) {
+ return functor(std::false_type(), std::true_type());
+ } else {
+ return functor(std::false_type(), std::false_type());
+ }
+ }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnAggFunc(const AggregationID &agg_id,
+ const FunctorT &functor) {
+ switch (agg_id) {
+ case AggregationID::kSum: {
+ return functor(Sum());
+ }
+ default:
+ LOG(FATAL) << "Not supported";
+ }
+}
+
+template <typename FunctorT>
+inline auto InvokeIf(const std::true_type &val,
+ const FunctorT &functor) {
+ return functor();
+}
+
+template <typename FunctorT>
+inline void InvokeIf(const std::false_type &val,
+ const FunctorT &functor) {
+}
+
+//template <typename FunctorT>
+//inline void InvokeOnAggFuncIfApplicableToArgType(
+// const AggregationID &agg_id,
+// const Type &arg_type,
+// const FunctorT &functor) {
+// InvokeOnAggFunc(
+// agg_id,
+// [&](const auto &agg_func) -> void {
+// InvokeOnType(
+// arg_type,
+// [&](const auto &arg_type) -> void {
+// using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+// using ArgT = remove_const_reference_t<decltype(arg_type)>;
+//
+// InvokeIf(
+// typename AggFuncT::template HasAtomicImpl<ArgT>(),
+// [&]() -> void {
+// functor(agg_func, arg_type);
+// });
+// });
+// });
+//}
+
+template <typename FunctorT>
+inline void InvokeOnAggFuncWithArgType(
+ const AggregationID &agg_id,
+ const Type &arg_type,
+ const FunctorT &functor) {
+ InvokeOnAggFunc(
+ agg_id,
+ [&](const auto &agg_func) -> void {
+ InvokeOnType(
+ arg_type,
+ [&](const auto &arg_type) -> void {
+ functor(agg_func, arg_type);
+ });
+ });
+}
+
+template <typename FunctorT>
+inline auto InvokeOnTwoAccessors(
+ const ValueAccessorMultiplexer &accessor_mux,
+ const ValueAccessorSource &first_source,
+ const ValueAccessorSource &second_source,
+ const FunctorT &functor) {
+ ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+ ColumnVectorsValueAccessor *derived_accessor =
+ static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor());
+
+ InvokeOnAnyValueAccessor(
+ base_accessor,
+ [&](auto *accessor) {
+ if (first_source == ValueAccessorSource::kBase) {
+ if (second_source == ValueAccessorSource::kBase) {
+ return functor(std::false_type(), accessor, accessor);
+ } else {
+ return functor(std::true_type(), accessor, derived_accessor);
+ }
+ } else {
+ if (second_source == ValueAccessorSource::kBase) {
+ return functor(std::true_type(), derived_accessor, accessor);
+ } else {
+ return functor(std::false_type(), derived_accessor, derived_accessor);
+ }
+ }
+ });
+}
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_AGGREGATION_UTIL_HPP_
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index fcc069b..8ef3560 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -145,6 +145,7 @@
AggregationOperationState.cpp
AggregationOperationState.hpp)
add_library(quickstep_storage_AggregationOperationState_proto ${storage_AggregationOperationState_proto_srcs})
+add_library(quickstep_storage_AggregationUtil ../empty_src.cpp AggregationUtil.hpp)
add_library(quickstep_storage_BasicColumnStoreTupleStorageSubBlock
BasicColumnStoreTupleStorageSubBlock.cpp
BasicColumnStoreTupleStorageSubBlock.hpp)
@@ -304,6 +305,15 @@
quickstep_expressions_aggregation_AggregateFunction_proto
quickstep_storage_HashTable_proto
${PROTOBUF_LIBRARY})
+target_link_libraries(quickstep_storage_AggregationUtil
+ quickstep_expressions_aggregation_AggFunc
+ quickstep_expressions_aggregation_AggregationID
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorMultiplexer
+ quickstep_storage_ValueAccessorUtil
+ quickstep_types_Type
+ quickstep_types_TypeID
+ quickstep_types_containers_ColumnVectorsValueAccessor)
target_link_libraries(quickstep_storage_BasicColumnStoreTupleStorageSubBlock
quickstep_catalog_CatalogAttribute
quickstep_catalog_CatalogRelationSchema
@@ -441,6 +451,7 @@
quickstep_expressions_aggregation_AggFunc
quickstep_expressions_aggregation_AggregationHandle
quickstep_expressions_aggregation_AggregationID
+ quickstep_storage_AggregationUtil
quickstep_storage_HashTableBase
quickstep_storage_StorageBlob
quickstep_storage_StorageBlockInfo
@@ -453,7 +464,6 @@
quickstep_types_Type
quickstep_types_TypeID
quickstep_types_containers_ColumnVector
- quickstep_types_containers_ColumnVectorsValueAccessor
quickstep_utility_BoolVector
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_ColumnStoreUtil
@@ -801,8 +811,11 @@
quickstep_utility_Macros
quickstep_utility_PrimeNumber)
target_link_libraries(quickstep_storage_PackedPayloadHashTable
+ ${GFLAGS_LIB_NAME}
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationHandle
+ quickstep_expressions_aggregation_AggFunc
+ quickstep_storage_AggregationUtil
quickstep_storage_HashTableBase
quickstep_storage_HashTableKeyManager
quickstep_storage_StorageBlob
@@ -1111,6 +1124,7 @@
target_link_libraries(quickstep_storage
quickstep_storage_AggregationOperationState
quickstep_storage_AggregationOperationState_proto
+ quickstep_storage_AggregationUtil
quickstep_storage_BasicColumnStoreTupleStorageSubBlock
quickstep_storage_BasicColumnStoreValueAccessor
quickstep_storage_BloomFilterIndexSubBlock
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
index c92f0ab..4c57cc9 100644
--- a/storage/CollisionFreeVectorTable.cpp
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -29,14 +29,13 @@
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/AggregationUtil.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorMultiplexer.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "threading/SpinMutex.hpp"
-#include "types/TypeID.hpp"
-#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "utility/BoolVector.hpp"
#include "glog/logging.h"
@@ -46,163 +45,6 @@
DEFINE_uint64(vt_threadprivate_threshold, 1000000L, "");
DEFINE_bool(use_latch, false, "");
-namespace {
-
-template <typename T>
-using remove_const_reference_t = std::remove_const_t<std::remove_reference_t<T>>;
-
-template <typename FunctorT>
-inline auto InvokeOnKeyType(const Type &type,
- const FunctorT &functor) {
- switch (type.getTypeID()) {
- case TypeID::kInt:
- return functor(static_cast<const IntType&>(type));
- case TypeID::kLong:
- return functor(static_cast<const LongType&>(type));
- default:
- LOG(FATAL) << "Not supported";
- }
-}
-
-template <typename FunctorT>
-inline auto InvokeOnType(const Type &type,
- const FunctorT &functor) {
- switch (type.getTypeID()) {
- case TypeID::kInt:
- return functor(static_cast<const IntType&>(type));
- case TypeID::kLong:
- return functor(static_cast<const LongType&>(type));
- case TypeID::kFloat:
- return functor(static_cast<const FloatType&>(type));
- case TypeID::kDouble:
- return functor(static_cast<const DoubleType&>(type));
- default:
- LOG(FATAL) << "Not supported";
- }
-}
-
-template <typename FunctorT>
-inline auto InvokeOnBool(const bool &val,
- const FunctorT &functor) {
- if (val) {
- return functor(std::true_type());
- } else {
- return functor(std::false_type());
- }
-}
-
-template <typename FunctorT>
-inline auto InvokeOnBools(const bool &val1,
- const bool &val2,
- const FunctorT &functor) {
- if (val1) {
- if (val2) {
- return functor(std::true_type(), std::true_type());
- } else {
- return functor(std::true_type(), std::false_type());
- }
- } else {
- if (val2) {
- return functor(std::false_type(), std::true_type());
- } else {
- return functor(std::false_type(), std::false_type());
- }
- }
-}
-
-template <typename FunctorT>
-inline auto InvokeOnAggFunc(const AggregationID &agg_id,
- const FunctorT &functor) {
- switch (agg_id) {
- case AggregationID::kSum: {
- return functor(Sum());
- }
- default:
- LOG(FATAL) << "Not supported";
- }
-}
-
-template <typename FunctorT>
-inline auto InvokeIf(const std::true_type &val,
- const FunctorT &functor) {
- return functor();
-}
-
-template <typename FunctorT>
-inline void InvokeIf(const std::false_type &val,
- const FunctorT &functor) {
-}
-
-//template <typename FunctorT>
-//inline void InvokeOnAggFuncIfApplicableToArgType(
-// const AggregationID &agg_id,
-// const Type &arg_type,
-// const FunctorT &functor) {
-// InvokeOnAggFunc(
-// agg_id,
-// [&](const auto &agg_func) -> void {
-// InvokeOnType(
-// arg_type,
-// [&](const auto &arg_type) -> void {
-// using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
-// using ArgT = remove_const_reference_t<decltype(arg_type)>;
-//
-// InvokeIf(
-// typename AggFuncT::template HasAtomicImpl<ArgT>(),
-// [&]() -> void {
-// functor(agg_func, arg_type);
-// });
-// });
-// });
-//}
-
-template <typename FunctorT>
-inline void InvokeOnAggFuncWithArgType(
- const AggregationID &agg_id,
- const Type &arg_type,
- const FunctorT &functor) {
- InvokeOnAggFunc(
- agg_id,
- [&](const auto &agg_func) -> void {
- InvokeOnType(
- arg_type,
- [&](const auto &arg_type) -> void {
- functor(agg_func, arg_type);
- });
- });
-}
-
-template <typename FunctorT>
-inline auto InvokeOnTwoAccessors(
- const ValueAccessorMultiplexer &accessor_mux,
- const ValueAccessorSource &first_source,
- const ValueAccessorSource &second_source,
- const FunctorT &functor) {
- ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
- ColumnVectorsValueAccessor *derived_accessor =
- static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor());
-
- InvokeOnAnyValueAccessor(
- base_accessor,
- [&](auto *accessor) {
- if (first_source == ValueAccessorSource::kBase) {
- if (second_source == ValueAccessorSource::kBase) {
- return functor(std::false_type(), accessor, accessor);
- } else {
- return functor(std::true_type(), accessor, derived_accessor);
- }
- } else {
- if (second_source == ValueAccessorSource::kBase) {
- return functor(std::true_type(), derived_accessor, accessor);
- } else {
- return functor(std::false_type(), derived_accessor, derived_accessor);
- }
- }
- });
-}
-
-} // namespace
-
CollisionFreeVectorTable::CollisionFreeVectorTable(
const Type *key_type,
const std::size_t num_entries,
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
index bd7e960..c9d956c 100644
--- a/storage/PackedPayloadHashTable.cpp
+++ b/storage/PackedPayloadHashTable.cpp
@@ -26,6 +26,8 @@
#include <vector>
#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggFunc.hpp"
+#include "storage/AggregationUtil.hpp"
#include "storage/HashTableKeyManager.hpp"
#include "storage/StorageBlob.hpp"
#include "storage/StorageBlockInfo.hpp"
@@ -53,28 +55,14 @@
: key_types_(key_types),
num_handles_(handles.size()),
handles_(handles),
- total_payload_size_(ComputeTotalPayloadSize(handles)),
+ state_sizes_(ComputeStateSizes(handles)),
+ total_payload_size_(ComputeTotalPayloadSize(state_sizes_)),
+ state_offsets_(ComputeStateOffsets(state_sizes_)),
storage_manager_(storage_manager),
kBucketAlignment(alignof(std::atomic<std::size_t>)),
kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
key_manager_(key_types_, kValueOffset + total_payload_size_),
bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
- std::size_t payload_offset_running_sum = sizeof(SpinMutex);
- for (const auto *handle : handles) {
- payload_offsets_.emplace_back(payload_offset_running_sum);
- payload_offset_running_sum += handle->getPayloadSize();
- }
-
- // NOTE(jianqiao): Potential memory leak / double freeing by copying from
- // init_payload to buckets if payload contains out of line data.
- init_payload_ =
- static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
- DCHECK(init_payload_ != nullptr);
-
- for (std::size_t i = 0; i < num_handles_; ++i) {
- handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
- }
-
// Bucket size always rounds up to the alignment requirement of the atomic
// size_t "next" pointer at the front or a ValueT, whichever is larger.
//
@@ -192,7 +180,6 @@
blob_.release();
storage_manager_->deleteBlockOrBlobFile(blob_id);
}
- std::free(init_payload_);
}
void PackedPayloadHashTable::clear() {
@@ -214,17 +201,6 @@
}
void PackedPayloadHashTable::destroyPayload() {
- const std::size_t num_buckets =
- header_->buckets_allocated.load(std::memory_order_relaxed);
- void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset;
- for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) {
- for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) {
- void *value_internal_ptr =
- static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id];
- handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr));
- }
- bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_;
- }
}
bool PackedPayloadHashTable::upsertValueAccessorCompositeKey(
@@ -232,24 +208,131 @@
const std::vector<MultiSourceAttributeId> &key_attr_ids,
const ValueAccessorMultiplexer &accessor_mux) {
ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
- ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
+ CHECK(accessor_mux.getDerivedAccessor() == nullptr);
- base_accessor->beginIterationVirtual();
- if (derived_accessor == nullptr) {
- return upsertValueAccessorCompositeKeyInternal<false>(
- argument_ids,
- key_attr_ids,
- base_accessor,
- nullptr);
- } else {
- DCHECK(derived_accessor->getImplementationType()
- == ValueAccessor::Implementation::kColumnVectors);
- derived_accessor->beginIterationVirtual();
- return upsertValueAccessorCompositeKeyInternal<true>(
- argument_ids,
- key_attr_ids,
- base_accessor,
- static_cast<ColumnVectorsValueAccessor *>(derived_accessor));
+ std::vector<attribute_id> key_ids;
+ for (const auto &key_attr_id : key_attr_ids) {
+ key_ids.emplace_back(key_attr_id.attr_id);
+ }
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ DCHECK_LE(argument_ids[i].size(), 1u);
+
+ const AggregationHandle *handle = handles_[i];
+ const auto &argument_types = handle->getArgumentTypes();
+ const auto &argument_ids_i = argument_ids[i];
+
+ attribute_id argument_id = kInvalidAttributeID;
+ const Type *argument_type = nullptr;
+
+ if (argument_ids_i.empty()) {
+ LOG(FATAL) << "Not supported";
+ } else {
+ DCHECK_EQ(1u, argument_ids_i.size());
+ argument_id = argument_ids_i.front().attr_id;
+
+ DCHECK_EQ(1u, argument_types.size());
+ argument_type = argument_types.front();
+ }
+
+ InvokeOnAggFuncWithArgType(
+ handle->getAggregationID(),
+ *argument_type,
+ [&](const auto &agg_func, const auto &arg_type) {
+ using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+ using ArgT = remove_const_reference_t<decltype(arg_type)>;
+
+ InvokeOnAnyValueAccessor(
+ base_accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+
+ if (key_ids.size() == 1) {
+ if (FLAGS_use_latch) {
+ this->upsertValueAccessorInternalUnaryLatch<AggFuncT, ArgT>(
+ argument_id,
+ key_ids.front(),
+ state_offsets_[i],
+ accessor);
+ } else {
+ this->upsertValueAccessorInternalUnaryAtomic<AggFuncT, ArgT>(
+ argument_id,
+ key_ids.front(),
+ state_offsets_[i],
+ accessor);
+ }
+ } else {
+ if (FLAGS_use_latch) {
+ this->upsertValueAccessorCompositeKeyInternalUnaryLatch<AggFuncT, ArgT>(
+ argument_id,
+ key_ids,
+ state_offsets_[i],
+ accessor);
+ } else {
+ this->upsertValueAccessorCompositeKeyInternalUnaryAtomic<AggFuncT, ArgT>(
+ argument_id,
+ key_ids,
+ state_offsets_[i],
+ accessor);
+ }
+ }
+ });
+ });
+ }
+ return true;
+}
+
+void PackedPayloadHashTable::finalize(const std::size_t partition_id,
+ ColumnVectorsValueAccessor *results) {
+ const std::size_t start_position =
+ calculatePartitionStartPosition(partition_id);
+ const std::size_t end_position =
+ calculatePartitionEndPosition(partition_id);
+
+ const std::size_t num_entries = end_position - start_position;
+
+ for (std::size_t key_idx = 0; key_idx < key_types_.size(); ++key_idx) {
+ NativeColumnVector *key_cv =
+ new NativeColumnVector(*key_types_[key_idx], num_entries);
+ finalizeKey(start_position, end_position, key_idx, key_cv);
+ results->addColumn(key_cv);
+ }
+
+ for (std::size_t i = 0; i < num_handles_; ++i) {
+ const AggregationHandle *handle = handles_[i];
+ const auto &argument_types = handle->getArgumentTypes();
+
+ const Type *argument_type = nullptr;
+ if (argument_types.empty()) {
+ LOG(FATAL) << "Not supported";
+ } else {
+ DCHECK_EQ(1u, argument_types.size());
+ argument_type = argument_types.front();
+ }
+
+ NativeColumnVector *result_cv =
+ new NativeColumnVector(*handle->getResultType(), num_entries);
+
+ InvokeOnAggFuncWithArgType(
+ handle->getAggregationID(),
+ *argument_type,
+ [&](const auto &agg_func, const auto &arg_type) {
+ using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+ using ArgT = remove_const_reference_t<decltype(arg_type)>;
+
+ if (FLAGS_use_latch) {
+ this->finalizeStateLatch<AggFuncT, ArgT>(start_position,
+ end_position,
+ state_offsets_[i],
+ result_cv);
+ } else {
+ this->finalizeStateAtomic<AggFuncT, ArgT>(start_position,
+ end_position,
+ state_offsets_[i],
+ result_cv);
+ }
+ });
+
+ results->addColumn(result_cv);
}
}
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index 9ba5500..1a56f94 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -20,6 +20,7 @@
#ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
#define QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
+#include <algorithm>
#include <atomic>
#include <cstddef>
#include <cstdint>
@@ -29,6 +30,8 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggFunc.hpp"
+#include "storage/AggregationUtil.hpp"
#include "storage/HashTableBase.hpp"
#include "storage/HashTableKeyManager.hpp"
#include "storage/StorageBlob.hpp"
@@ -38,14 +41,20 @@
#include "threading/SpinMutex.hpp"
#include "threading/SpinSharedMutex.hpp"
#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "utility/HashPair.hpp"
#include "utility/Macros.hpp"
+#include "gflags/gflags.h"
+
#include "glog/logging.h"
namespace quickstep {
+DECLARE_int32(num_workers);
+DECLARE_bool(use_latch);
+
class StorageManager;
class Type;
class ValueAccessor;
@@ -126,6 +135,9 @@
const std::vector<MultiSourceAttributeId> &key_ids,
const ValueAccessorMultiplexer &accessor_mux) override;
+ void finalize(const std::size_t partition_id,
+ ColumnVectorsValueAccessor *results);
+
/**
* @return The ID of the StorageBlob used to store this hash table.
**/
@@ -364,16 +376,55 @@
const std::uint8_t **value,
std::size_t *entry_num) const;
- inline std::uint8_t* upsertCompositeKeyInternal(
- const std::vector<TypedValue> &key,
- const std::size_t variable_key_size);
+ inline std::uint8_t* upsertInternal(const TypedValue &key);
- template <bool use_two_accessors>
- inline bool upsertValueAccessorCompositeKeyInternal(
- const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
- const std::vector<MultiSourceAttributeId> &key_ids,
- ValueAccessor *base_accessor,
- ColumnVectorsValueAccessor *derived_accessor);
+ inline std::uint8_t* upsertCompositeKeyInternal(
+ const std::vector<TypedValue> &key);
+
+ template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+ inline bool upsertValueAccessorInternalUnaryAtomic(
+ const attribute_id argument_ids,
+ const attribute_id key_id,
+ const std::size_t state_offset,
+ ValueAccessorT *accessor);
+
+ template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+ inline bool upsertValueAccessorInternalUnaryLatch(
+ const attribute_id argument_ids,
+ const attribute_id key_id,
+ const std::size_t state_offset,
+ ValueAccessorT *accessor);
+
+ template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+ inline bool upsertValueAccessorCompositeKeyInternalUnaryAtomic(
+ const attribute_id argument_id,
+ const std::vector<attribute_id> key_ids,
+ const std::size_t state_offset,
+ ValueAccessorT *accessor);
+
+ template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+ inline bool upsertValueAccessorCompositeKeyInternalUnaryLatch(
+ const attribute_id argument_id,
+ const std::vector<attribute_id> key_ids,
+ const std::size_t state_offset,
+ ValueAccessorT *accessor);
+
+ inline void finalizeKey(const std::size_t start_position,
+ const std::size_t end_position,
+ const std::size_t key_idx,
+ NativeColumnVector *output_cv);
+
+ template <typename AggFuncT, typename ArgT>
+ inline void finalizeStateAtomic(const std::size_t start_position,
+ const std::size_t end_position,
+ const std::size_t state_offset,
+ NativeColumnVector *results);
+
+ template <typename AggFuncT, typename ArgT>
+ inline void finalizeStateLatch(const std::size_t start_position,
+ const std::size_t end_position,
+ const std::size_t state_offset,
+ NativeColumnVector *results);
// Generate a hash for a composite key by hashing each component of 'key' and
// mixing their bits with CombineHashes().
@@ -387,33 +438,62 @@
key_inline_ = key_inline;
}
- inline static std::size_t ComputeTotalPayloadSize(
+ inline static std::vector<std::size_t> ComputeStateSizes(
const std::vector<AggregationHandle *> &handles) {
- std::size_t total_payload_size = sizeof(SpinMutex);
- for (const auto *handle : handles) {
- total_payload_size += handle->getPayloadSize();
+ std::vector<std::size_t> state_sizes;
+ for (std::size_t i = 0; i < handles.size(); ++i) {
+ const AggregationHandle *handle = handles[i];
+ InvokeOnAggFuncWithArgType(
+ handle->getAggregationID(),
+ *handle->getArgumentTypes().front(),
+ [&](const auto &agg_func, const auto &arg_type) {
+ using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+ using ArgT = remove_const_reference_t<decltype(arg_type)>;
+
+ if (FLAGS_use_latch) {
+ state_sizes.emplace_back(
+ sizeof(typename AggFuncT::template AggState<ArgT>::T));
+ } else {
+ state_sizes.emplace_back(
+ sizeof(typename AggFuncT::template AggState<ArgT>::AtomicT));
+ }
+ });
}
- return total_payload_size;
+ return state_sizes;
+ }
+
+ inline static std::size_t ComputeTotalPayloadSize(
+ const std::vector<std::size_t> &state_sizes) {
+ const std::size_t mutex_size =
+ FLAGS_use_latch ? sizeof(SpinMutex) : 0;
+ const std::size_t total_state_size =
+ std::accumulate(state_sizes.begin(), state_sizes.end(), 0);
+ return mutex_size + total_state_size;
+ }
+
+ inline static std::vector<std::size_t> ComputeStateOffsets(
+ const std::vector<std::size_t> &state_sizes) {
+ std::vector<std::size_t> state_offsets;
+ std::size_t state_offset =
+ FLAGS_use_latch ? sizeof(SpinMutex) : 0;
+ for (const std::size_t state_size : state_sizes) {
+ state_offsets.emplace_back(state_offset);
+ state_offset += state_size;
+ }
+ return state_offsets;
}
// Assign '*key_vector' with the attribute values specified by 'key_ids' at
// the current position of 'accessor'. If 'check_for_null_keys' is true, stops
// and returns true if any of the values is null, otherwise returns false.
- template <bool use_two_accessors,
- bool check_for_null_keys,
+ template <bool check_for_null_keys,
typename ValueAccessorT>
inline static bool GetCompositeKeyFromValueAccessor(
- const std::vector<MultiSourceAttributeId> &key_ids,
+ const std::vector<attribute_id> &key_ids,
const ValueAccessorT *accessor,
- const ColumnVectorsValueAccessor *derived_accessor,
std::vector<TypedValue> *key_vector) {
for (std::size_t key_idx = 0; key_idx < key_ids.size(); ++key_idx) {
- const MultiSourceAttributeId &key_id = key_ids[key_idx];
- if (use_two_accessors && key_id.source == ValueAccessorSource::kDerived) {
- (*key_vector)[key_idx] = derived_accessor->getTypedValue(key_id.attr_id);
- } else {
- (*key_vector)[key_idx] = accessor->getTypedValue(key_id.attr_id);
- }
+ (*key_vector)[key_idx] = accessor->getTypedValue(key_ids[key_idx]);
if (check_for_null_keys && (*key_vector)[key_idx].isNull()) {
return true;
}
@@ -441,9 +521,9 @@
const std::size_t num_handles_;
const std::vector<AggregationHandle *> handles_;
- std::size_t total_payload_size_;
- std::vector<std::size_t> payload_offsets_;
- std::uint8_t *init_payload_;
+ const std::vector<std::size_t> state_sizes_;
+ const std::size_t total_payload_size_;
+ const std::vector<std::size_t> state_offsets_;
StorageManager *storage_manager_;
MutableBlobReference blob_;
@@ -471,8 +551,8 @@
// Set finalization segment size as 4096 entries.
constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L;
- // At least 1 partition, at most 80 partitions.
- return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL));
+ return std::max(static_cast<std::size_t>(FLAGS_num_workers),
+ std::min(num_entries / kFinalizeSegmentSize, 80uL));
}
// Attempt to find an empty bucket to insert 'hash_code' into, starting after
@@ -488,7 +568,6 @@
// deallocated after being allocated.
inline bool locateBucketForInsertion(
const std::size_t hash_code,
- const std::size_t variable_key_allocation_required,
void **bucket,
std::atomic<std::size_t> **pending_chain_ptr,
std::size_t *pending_chain_ptr_finish_value);
@@ -636,7 +715,6 @@
inline bool PackedPayloadHashTable::locateBucketForInsertion(
const std::size_t hash_code,
- const std::size_t variable_key_allocation_required,
void **bucket,
std::atomic<std::size_t> **pending_chain_ptr,
std::size_t *pending_chain_ptr_finish_value) {
@@ -652,17 +730,6 @@
std::numeric_limits<std::size_t>::max(),
std::memory_order_acq_rel)) {
// Got to the end of the chain. Allocate a new bucket.
-
- // First, allocate variable-length key storage, if needed (i.e. if this
- // is an upsert and we didn't allocate up-front).
- if (!key_manager_.allocateVariableLengthKeyStorage(
- variable_key_allocation_required)) {
- // Ran out of variable-length storage.
- (*pending_chain_ptr)->store(0, std::memory_order_release);
- *bucket = nullptr;
- return false;
- }
-
const std::size_t allocated_bucket_num =
header_->buckets_allocated.fetch_add(1, std::memory_order_relaxed);
if (allocated_bucket_num >= header_->num_buckets) {
@@ -730,27 +797,7 @@
inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey(
const std::vector<TypedValue> &key,
const std::size_t index) const {
- DEBUG_ASSERT(this->key_types_.size() == key.size());
-
- const std::size_t hash_code = this->hashCompositeKey(key);
- std::size_t bucket_ref =
- slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
- while (bucket_ref != 0) {
- DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
- const char *bucket =
- static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
- const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
- bucket + sizeof(std::atomic<std::size_t>));
- if ((bucket_hash == hash_code) &&
- key_manager_.compositeKeyCollisionCheck(key, bucket)) {
- // Match located.
- return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset) +
- this->payload_offsets_[index];
- }
- bucket_ref =
- reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
- std::memory_order_relaxed);
- }
+ LOG(FATAL) << "Not implemented";
// Reached the end of the chain and didn't find a match.
return nullptr;
@@ -759,24 +806,9 @@
inline bool PackedPayloadHashTable::upsertCompositeKey(
const std::vector<TypedValue> &key,
const std::uint8_t *source_state) {
- const std::size_t variable_size =
- calculateVariableLengthCompositeKeyCopySize(key);
- for (;;) {
- {
- SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
- std::uint8_t *value =
- upsertCompositeKeyInternal(key, variable_size);
- if (value != nullptr) {
- SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
- for (unsigned int k = 0; k < num_handles_; ++k) {
- handles_[k]->mergeStates(source_state + payload_offsets_[k],
- value + payload_offsets_[k]);
- }
- return true;
- }
- }
- resize(0, variable_size);
- }
+ LOG(FATAL) << "Not implemented";
+
+ return true;
}
template <typename FunctorT>
@@ -784,48 +816,56 @@
const std::vector<TypedValue> &key,
FunctorT *functor,
const std::size_t index) {
- const std::size_t variable_size =
- calculateVariableLengthCompositeKeyCopySize(key);
- for (;;) {
- {
- SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
- std::uint8_t *value =
- upsertCompositeKeyInternal(key, variable_size);
- if (value != nullptr) {
- (*functor)(value + payload_offsets_[index]);
- return true;
- }
- }
- resize(0, variable_size);
- }
+ LOG(FATAL) << "Not implemented";
+
+ return true;
}
-
-inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
- const std::vector<TypedValue> &key,
- const std::size_t variable_key_size) {
- if (variable_key_size > 0) {
- // Don't allocate yet, since the key may already be present. However, we
- // do check if either the allocated variable storage space OR the free
- // space is big enough to hold the key (at least one must be true: either
- // the key is already present and allocated, or we need to be able to
- // allocate enough space for it).
- std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(
- std::memory_order_relaxed);
- if ((allocated_bytes < variable_key_size) &&
- (allocated_bytes + variable_key_size >
- key_manager_.getVariableLengthKeyStorageSize())) {
+inline std::uint8_t* PackedPayloadHashTable::upsertInternal(
+ const TypedValue &key) {
+ const std::size_t hash_code = key.getHash();
+ void *bucket = nullptr;
+ std::atomic<std::size_t> *pending_chain_ptr;
+ std::size_t pending_chain_ptr_finish_value;
+ for (;;) {
+ if (locateBucketForInsertion(hash_code,
+ &bucket,
+ &pending_chain_ptr,
+ &pending_chain_ptr_finish_value)) {
+ // Found an empty bucket.
+ break;
+ } else if (bucket == nullptr) {
+ // Ran out of buckets or variable-key space.
return nullptr;
+ } else if (key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Found an already-existing entry for this key.
+ return reinterpret_cast<std::uint8_t *>(static_cast<char *>(bucket) +
+ kValueOffset);
}
}
+ // We are now writing to an empty bucket.
+ // Write the key and hash.
+ writeScalarKeyToBucket(key, hash_code, bucket);
+
+ std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
+// std::memcpy(value, init_payload_, this->total_payload_size_);
+
+ // Update the previous chain pointer to point to the new bucket.
+ pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release);
+
+ // Return the value.
+ return value;
+}
+
+inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
+ const std::vector<TypedValue> &key) {
const std::size_t hash_code = this->hashCompositeKey(key);
void *bucket = nullptr;
std::atomic<std::size_t> *pending_chain_ptr;
std::size_t pending_chain_ptr_finish_value;
for (;;) {
if (locateBucketForInsertion(hash_code,
- variable_key_size,
&bucket,
&pending_chain_ptr,
&pending_chain_ptr_finish_value)) {
@@ -846,7 +886,8 @@
writeCompositeKeyToBucket(key, hash_code, bucket);
std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
- std::memcpy(value, init_payload_, this->total_payload_size_);
+// std::memcpy(value, init_payload_, this->total_payload_size_);
+ std::memset(value, 0, this->total_payload_size_);
// Update the previous chaing pointer to point to the new bucket.
pending_chain_ptr->store(pending_chain_ptr_finish_value,
@@ -856,72 +897,165 @@
return value;
}
-template <bool use_two_accessors>
-inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
- const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
- const std::vector<MultiSourceAttributeId> &key_ids,
- ValueAccessor *base_accessor,
- ColumnVectorsValueAccessor *derived_accessor) {
- std::size_t variable_size;
+template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+inline bool PackedPayloadHashTable
+ ::upsertValueAccessorInternalUnaryAtomic(
+ const attribute_id argument_id,
+ const attribute_id key_id,
+ const std::size_t state_offset,
+ ValueAccessorT *accessor) {
+ using StateT = typename AggFuncT::template AggState<ArgT>;
+
+ accessor->beginIteration();
+ while (accessor->next()) {
+ TypedValue key = accessor->getTypedValue(key_id);
+ std::uint8_t *payload = this->upsertInternal(key);
+ const auto *argument = static_cast<const typename ArgT::cpptype *>(
+ accessor->template getUntypedValue<false>(argument_id));
+ auto *state =
+ reinterpret_cast<typename StateT::AtomicT *>(payload + state_offset);
+
+ AggFuncT::template MergeArgAtomic<ArgT>(*argument, state);
+ }
+ return true;
+}
+
+template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+inline bool PackedPayloadHashTable
+ ::upsertValueAccessorInternalUnaryLatch(
+ const attribute_id argument_id,
+ const attribute_id key_id,
+ const std::size_t state_offset,
+ ValueAccessorT *accessor) {
+ using StateT = typename AggFuncT::template AggState<ArgT>;
+
+ accessor->beginIteration();
+ while (accessor->next()) {
+ TypedValue key = accessor->getTypedValue(key_id);
+ std::uint8_t *payload = this->upsertInternal(key);
+ const auto *argument = static_cast<const typename ArgT::cpptype *>(
+ accessor->template getUntypedValue<false>(argument_id));
+ auto *state =
+ reinterpret_cast<typename StateT::T *>(payload + state_offset);
+
+ SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(payload)));
+ AggFuncT::template MergeArgUnsafe<ArgT>(*argument, state);
+ }
+ return true;
+}
+
+template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+inline bool PackedPayloadHashTable::
+ upsertValueAccessorCompositeKeyInternalUnaryAtomic(
+ const attribute_id argument_id,
+ const std::vector<attribute_id> key_ids,
+ const std::size_t state_offset,
+ ValueAccessorT *accessor) {
+ using StateT = typename AggFuncT::template AggState<ArgT>;
+
std::vector<TypedValue> key_vector;
key_vector.resize(key_ids.size());
- return InvokeOnAnyValueAccessor(
- base_accessor,
- [&](auto *accessor) -> bool { // NOLINT(build/c++11)
- bool continuing = true;
- while (continuing) {
- {
- continuing = false;
- SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
- while (accessor->next()) {
- if (use_two_accessors) {
- derived_accessor->next();
- }
- if (this->GetCompositeKeyFromValueAccessor<use_two_accessors, true>(
- key_ids,
- accessor,
- derived_accessor,
- &key_vector)) {
- continue;
- }
- variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
- std::uint8_t *value = this->upsertCompositeKeyInternal(
- key_vector, variable_size);
- if (value == nullptr) {
- continuing = true;
- break;
- } else {
- SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
- for (unsigned int k = 0; k < num_handles_; ++k) {
- const auto &ids = argument_ids[k];
- if (ids.empty()) {
- handles_[k]->updateStateNullary(value + payload_offsets_[k]);
- } else {
- const MultiSourceAttributeId &arg_id = ids.front();
- if (use_two_accessors && arg_id.source == ValueAccessorSource::kDerived) {
- DCHECK_NE(arg_id.attr_id, kInvalidAttributeID);
- handles_[k]->updateStateUnary(derived_accessor->getTypedValue(arg_id.attr_id),
- value + payload_offsets_[k]);
- } else {
- handles_[k]->updateStateUnary(accessor->getTypedValue(arg_id.attr_id),
- value + payload_offsets_[k]);
- }
- }
- }
- }
- }
- }
- if (continuing) {
- this->resize(0, variable_size);
- accessor->previous();
- if (use_two_accessors) {
- derived_accessor->previous();
- }
- }
- }
- return true;
- });
+ accessor->beginIteration();
+ while (accessor->next()) {
+ this->GetCompositeKeyFromValueAccessor<false>(key_ids,
+ accessor,
+ &key_vector);
+ std::uint8_t *payload = this->upsertCompositeKeyInternal(key_vector);
+ const auto *argument = static_cast<const typename ArgT::cpptype *>(
+ accessor->template getUntypedValue<false>(argument_id));
+ auto *state =
+ reinterpret_cast<typename StateT::AtomicT *>(payload + state_offset);
+
+ AggFuncT::template MergeArgAtomic<ArgT>(*argument, state);
+ }
+ return true;
+}
+
+template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+inline bool PackedPayloadHashTable::
+ upsertValueAccessorCompositeKeyInternalUnaryLatch(
+ const attribute_id argument_id,
+ const std::vector<attribute_id> key_ids,
+ const std::size_t state_offset,
+ ValueAccessorT *accessor) {
+ using StateT = typename AggFuncT::template AggState<ArgT>;
+
+ std::vector<TypedValue> key_vector;
+ key_vector.resize(key_ids.size());
+
+ accessor->beginIteration();
+ while (accessor->next()) {
+ this->GetCompositeKeyFromValueAccessor<false>(key_ids,
+ accessor,
+ &key_vector);
+ std::uint8_t *payload = this->upsertCompositeKeyInternal(key_vector);
+ const auto *argument = static_cast<const typename ArgT::cpptype *>(
+ accessor->template getUntypedValue<false>(argument_id));
+ auto *state =
+ reinterpret_cast<typename StateT::T *>(payload + state_offset);
+
+ SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(payload)));
+ AggFuncT::template MergeArgUnsafe<ArgT>(*argument, state);
+ }
+ return true;
+}
+
+inline void PackedPayloadHashTable
+ ::finalizeKey(const std::size_t start_position,
+ const std::size_t end_position,
+ const std::size_t key_idx,
+ NativeColumnVector *output_cv) {
+ for (std::size_t i = start_position; i < end_position; ++i) {
+ const char *bucket =
+ static_cast<const char *>(buckets_) + i * bucket_size_;
+ output_cv->appendTypedValue(
+ key_manager_.getKeyComponentTyped(bucket, key_idx));
+ }
+}
+
+template <typename AggFuncT, typename ArgT>
+inline void PackedPayloadHashTable
+ ::finalizeStateAtomic(const std::size_t start_position,
+ const std::size_t end_position,
+ const std::size_t state_offset,
+ NativeColumnVector *output_cv) {
+ using StateT = typename AggFuncT::template AggState<ArgT>;
+ using ResultT = typename StateT::ResultT;
+
+ const std::size_t offset = kValueOffset + state_offset;
+ for (std::size_t i = start_position; i < end_position; ++i) {
+ const char *bucket =
+ static_cast<const char *>(buckets_) + i * bucket_size_;
+ const auto *state =
+ reinterpret_cast<const typename StateT::AtomicT *>(bucket + offset);
+
+ AggFuncT::template FinalizeAtomic<ArgT>(
+ *state,
+ static_cast<ResultT *>(output_cv->getPtrForDirectWrite()));
+ }
+}
+
+template <typename AggFuncT, typename ArgT>
+inline void PackedPayloadHashTable
+ ::finalizeStateLatch(const std::size_t start_position,
+ const std::size_t end_position,
+ const std::size_t state_offset,
+ NativeColumnVector *output_cv) {
+ using StateT = typename AggFuncT::template AggState<ArgT>;
+ using ResultT = typename StateT::ResultT;
+
+ const std::size_t offset = kValueOffset + state_offset;
+ for (std::size_t i = start_position; i < end_position; ++i) {
+ const char *bucket =
+ static_cast<const char *>(buckets_) + i * bucket_size_;
+ const auto *state =
+ reinterpret_cast<const typename StateT::T *>(bucket + offset);
+
+ AggFuncT::template FinalizeUnsafe<ArgT>(
+ *state,
+ static_cast<ResultT *>(output_cv->getPtrForDirectWrite()));
+ }
}
inline void PackedPayloadHashTable::writeScalarKeyToBucket(
@@ -990,7 +1124,7 @@
const std::uint8_t *value_ptr;
while (getNextEntry(&key, &value_ptr, &entry_num)) {
++entries_visited;
- (*functor)(key, value_ptr + payload_offsets_[index]);
+ (*functor)(key, value_ptr + state_offsets_[index]);
key.clear();
}
return entries_visited;
@@ -1044,7 +1178,7 @@
const std::uint8_t *value_ptr;
while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
++entries_visited;
- (*functor)(key, value_ptr + payload_offsets_[index]);
+ (*functor)(key, value_ptr + state_offsets_[index]);
key.clear();
}
return entries_visited;