blob: c92f0ab624e0263d969eacd38b151272eb27f314 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "storage/CollisionFreeVectorTable.hpp"
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <type_traits>
#include <vector>
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/aggregation/AggregationID.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "storage/ValueAccessor.hpp"
#include "storage/ValueAccessorMultiplexer.hpp"
#include "storage/ValueAccessorUtil.hpp"
#include "threading/SpinMutex.hpp"
#include "types/TypeID.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "utility/BoolVector.hpp"
#include "glog/logging.h"
namespace quickstep {
DEFINE_uint64(vt_threadprivate_threshold, 1000000L, "");
DEFINE_bool(use_latch, false, "");
namespace {
template <typename T>
using remove_const_reference_t = std::remove_const_t<std::remove_reference_t<T>>;
template <typename FunctorT>
inline auto InvokeOnKeyType(const Type &type,
const FunctorT &functor) {
switch (type.getTypeID()) {
case TypeID::kInt:
return functor(static_cast<const IntType&>(type));
case TypeID::kLong:
return functor(static_cast<const LongType&>(type));
default:
LOG(FATAL) << "Not supported";
}
}
template <typename FunctorT>
inline auto InvokeOnType(const Type &type,
const FunctorT &functor) {
switch (type.getTypeID()) {
case TypeID::kInt:
return functor(static_cast<const IntType&>(type));
case TypeID::kLong:
return functor(static_cast<const LongType&>(type));
case TypeID::kFloat:
return functor(static_cast<const FloatType&>(type));
case TypeID::kDouble:
return functor(static_cast<const DoubleType&>(type));
default:
LOG(FATAL) << "Not supported";
}
}
template <typename FunctorT>
inline auto InvokeOnBool(const bool &val,
const FunctorT &functor) {
if (val) {
return functor(std::true_type());
} else {
return functor(std::false_type());
}
}
template <typename FunctorT>
inline auto InvokeOnBools(const bool &val1,
const bool &val2,
const FunctorT &functor) {
if (val1) {
if (val2) {
return functor(std::true_type(), std::true_type());
} else {
return functor(std::true_type(), std::false_type());
}
} else {
if (val2) {
return functor(std::false_type(), std::true_type());
} else {
return functor(std::false_type(), std::false_type());
}
}
}
template <typename FunctorT>
inline auto InvokeOnAggFunc(const AggregationID &agg_id,
const FunctorT &functor) {
switch (agg_id) {
case AggregationID::kSum: {
return functor(Sum());
}
default:
LOG(FATAL) << "Not supported";
}
}
template <typename FunctorT>
inline auto InvokeIf(const std::true_type &val,
const FunctorT &functor) {
return functor();
}
template <typename FunctorT>
inline void InvokeIf(const std::false_type &val,
const FunctorT &functor) {
}
//template <typename FunctorT>
//inline void InvokeOnAggFuncIfApplicableToArgType(
// const AggregationID &agg_id,
// const Type &arg_type,
// const FunctorT &functor) {
// InvokeOnAggFunc(
// agg_id,
// [&](const auto &agg_func) -> void {
// InvokeOnType(
// arg_type,
// [&](const auto &arg_type) -> void {
// using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
// using ArgT = remove_const_reference_t<decltype(arg_type)>;
//
// InvokeIf(
// typename AggFuncT::template HasAtomicImpl<ArgT>(),
// [&]() -> void {
// functor(agg_func, arg_type);
// });
// });
// });
//}
template <typename FunctorT>
inline void InvokeOnAggFuncWithArgType(
const AggregationID &agg_id,
const Type &arg_type,
const FunctorT &functor) {
InvokeOnAggFunc(
agg_id,
[&](const auto &agg_func) -> void {
InvokeOnType(
arg_type,
[&](const auto &arg_type) -> void {
functor(agg_func, arg_type);
});
});
}
template <typename FunctorT>
inline auto InvokeOnTwoAccessors(
const ValueAccessorMultiplexer &accessor_mux,
const ValueAccessorSource &first_source,
const ValueAccessorSource &second_source,
const FunctorT &functor) {
ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
ColumnVectorsValueAccessor *derived_accessor =
static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor());
InvokeOnAnyValueAccessor(
base_accessor,
[&](auto *accessor) {
if (first_source == ValueAccessorSource::kBase) {
if (second_source == ValueAccessorSource::kBase) {
return functor(std::false_type(), accessor, accessor);
} else {
return functor(std::true_type(), accessor, derived_accessor);
}
} else {
if (second_source == ValueAccessorSource::kBase) {
return functor(std::true_type(), derived_accessor, accessor);
} else {
return functor(std::false_type(), derived_accessor, derived_accessor);
}
}
});
}
} // namespace
CollisionFreeVectorTable::CollisionFreeVectorTable(
const Type *key_type,
const std::size_t num_entries,
const std::vector<AggregationHandle *> &handles,
StorageManager *storage_manager)
: key_type_(key_type),
num_entries_(num_entries),
num_handles_(handles.size()),
handles_(handles),
use_thread_private_existence_map_(num_entries_ < FLAGS_vt_threadprivate_threshold),
num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)),
storage_manager_(storage_manager) {
DCHECK_GT(num_entries, 0u);
std::size_t required_memory = 0;
const std::size_t existence_map_offset = 0;
std::size_t mutex_vec_offset = 0;
std::vector<std::size_t> state_offsets;
if (!use_thread_private_existence_map_) {
required_memory += CacheLineAlignedBytes(
BarrieredReadWriteConcurrentBoolVector::BytesNeeded(num_entries));
}
if (FLAGS_use_latch) {
mutex_vec_offset = required_memory;
required_memory += CacheLineAlignedBytes(num_entries * sizeof(SpinMutex));
}
for (std::size_t i = 0; i < num_handles_; ++i) {
const AggregationHandle *handle = handles_[i];
const std::vector<const Type *> argument_types = handle->getArgumentTypes();
DCHECK_EQ(1u, argument_types.size());
std::size_t state_size = 0;
InvokeOnAggFuncWithArgType(
handle->getAggregationID(),
*argument_types.front(),
[&](const auto &agg_func, const auto &arg_type) {
using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
using ArgT = remove_const_reference_t<decltype(arg_type)>;
if (FLAGS_use_latch) {
state_size = sizeof(typename AggFuncT::template AggState<ArgT>::T);
} else {
state_size = sizeof(typename AggFuncT::template AggState<ArgT>::AtomicT);
}
});
state_offsets.emplace_back(required_memory);
required_memory += CacheLineAlignedBytes(state_size * num_entries);
}
const std::size_t num_storage_slots =
storage_manager_->SlotsNeededForBytes(required_memory);
const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
blob_ = storage_manager_->getBlobMutable(blob_id);
void *memory_start = blob_->getMemoryMutable();
if (use_thread_private_existence_map_) {
thread_private_existence_map_pool_.reset(new BoolVectorPool(num_entries));
} else {
concurrent_existence_map_.reset(new BarrieredReadWriteConcurrentBoolVector(
reinterpret_cast<char *>(memory_start) + existence_map_offset,
num_entries,
false /* initialize */));
}
if (FLAGS_use_latch) {
mutex_vec_ = reinterpret_cast<SpinMutex *>(
reinterpret_cast<char *>(memory_start) + mutex_vec_offset);
} else {
mutex_vec_ = nullptr;
}
for (std::size_t i = 0; i < num_handles_; ++i) {
// Columnwise layout.
vec_tables_.emplace_back(
reinterpret_cast<char *>(memory_start) + state_offsets.at(i));
}
memory_size_ = required_memory;
num_init_partitions_ = CalculateNumInitializationPartitions(memory_size_);
}
CollisionFreeVectorTable::~CollisionFreeVectorTable() {
const block_id blob_id = blob_->getID();
blob_.release();
storage_manager_->deleteBlockOrBlobFile(blob_id);
}
void CollisionFreeVectorTable::destroyPayload() {
}
bool CollisionFreeVectorTable::upsertValueAccessorCompositeKey(
const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
const std::vector<MultiSourceAttributeId> &key_ids,
const ValueAccessorMultiplexer &accessor_mux) {
DCHECK_EQ(1u, key_ids.size());
if (handles_.empty()) {
LOG(FATAL) << "Not implemented";
}
const ValueAccessorSource key_source = key_ids.front().source;
const attribute_id key_id = key_ids.front().attr_id;
const bool is_key_nullable = key_type_->isNullable();
for (std::size_t i = 0; i < num_handles_; ++i) {
DCHECK_LE(argument_ids[i].size(), 1u);
const AggregationHandle *handle = handles_[i];
const auto &argument_types = handle->getArgumentTypes();
const auto &argument_ids_i = argument_ids[i];
ValueAccessorSource argument_source;
attribute_id argument_id;
const Type *argument_type;
bool is_argument_nullable;
if (argument_ids_i.empty()) {
// argument_source = ValueAccessorSource::kInvalid;
// argument_id = kInvalidAttributeID;
//
// DCHECK(argument_types.empty());
// argument_type = nullptr;
// is_argument_nullable = false;
LOG(FATAL) << "Not supported";
} else {
DCHECK_EQ(1u, argument_ids_i.size());
argument_source = argument_ids_i.front().source;
argument_id = argument_ids_i.front().attr_id;
DCHECK_EQ(1u, argument_types.size());
argument_type = argument_types.front();
is_argument_nullable = argument_type->isNullable();
}
InvokeOnAggFuncWithArgType(
handle->getAggregationID(),
*argument_types.front(),
[&](const auto &agg_func, const auto &arg_type) {
using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
using ArgT = remove_const_reference_t<decltype(arg_type)>;
InvokeOnKeyType(
*key_type_,
[&](const auto &key_type) -> void {
using KeyT = remove_const_reference_t<decltype(key_type)>;
InvokeOnBools(
is_key_nullable,
is_argument_nullable,
[&](const auto &is_key_nullable,
const auto &is_argument_nullable) -> void {
using KeyNullableT =
remove_const_reference_t<decltype(is_key_nullable)>;
using ArgNullableT =
remove_const_reference_t<decltype(is_argument_nullable)>;
InvokeOnTwoAccessors(
accessor_mux,
key_source,
argument_source,
[&](const auto &use_two_accessors,
auto *key_accessor,
auto *argument_accessor) {
using UseTwoAccessorsT =
remove_const_reference_t<decltype(use_two_accessors)>;
invokeOnExistenceMap(
[&](auto *existence_map) -> void {
if (FLAGS_use_latch) {
upsertValueAccessorInternalUnaryLatch<
AggFuncT, KeyT, ArgT,
KeyNullableT::value, ArgNullableT::value, UseTwoAccessorsT::value>( key_id,
argument_id,
vec_tables_[i],
existence_map,
key_accessor,
argument_accessor);
} else {
upsertValueAccessorInternalUnaryAtomic<
AggFuncT, KeyT, ArgT,
KeyNullableT::value, ArgNullableT::value, UseTwoAccessorsT::value>( key_id,
argument_id,
vec_tables_[i],
existence_map,
key_accessor,
argument_accessor);
}
});
});
});
});
});
}
return true;
}
void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id,
NativeColumnVector *output_cv) const {
const std::size_t start_position =
calculatePartitionStartPosition(partition_id);
const std::size_t end_position =
calculatePartitionEndPosition(partition_id);
InvokeOnKeyType(
*key_type_,
[&](const auto &key_type) {
using KeyT = remove_const_reference_t<decltype(key_type)>;
invokeOnExistenceMapFinal(
[&](const auto *existence_map) -> void {
finalizeKeyInternal<typename KeyT::cpptype>(
start_position, end_position, existence_map, output_cv);
});
});
}
void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id,
const std::size_t handle_id,
NativeColumnVector *output_cv) const {
const std::size_t start_position =
calculatePartitionStartPosition(partition_id);
const std::size_t end_position =
calculatePartitionEndPosition(partition_id);
const AggregationHandle *handle = handles_[handle_id];
const auto &argument_types = handle->getArgumentTypes();
const Type *argument_type =
argument_types.empty() ? nullptr : argument_types.front();
DCHECK(argument_type != nullptr);
InvokeOnAggFuncWithArgType(
handle->getAggregationID(),
*argument_type,
[&](const auto &agg_func, const auto &arg_type) {
using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
using ArgT = remove_const_reference_t<decltype(arg_type)>;
invokeOnExistenceMapFinal(
[&](const auto *existence_map) -> void {
if (FLAGS_use_latch) {
finalizeStateInternalLatch<AggFuncT, ArgT>(start_position,
end_position,
vec_tables_[handle_id],
existence_map,
output_cv);
} else {
finalizeStateInternalAtomic<AggFuncT, ArgT>(start_position,
end_position,
vec_tables_[handle_id],
existence_map,
output_cv);
}
});
});
}
} // namespace quickstep