Merge branch 'master' into in-operator-support
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index eea94cd..ac41912 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -168,11 +168,13 @@
quickstep_storage_StorageBlock
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
+ quickstep_storage_SubBlocksReference
quickstep_storage_TupleReference
quickstep_storage_TupleStorageSubBlock
quickstep_storage_ValueAccessor
quickstep_types_containers_ColumnVectorsValueAccessor
quickstep_utility_Macros
+ quickstep_utility_PtrList
tmb)
target_link_libraries(quickstep_relationaloperators_InsertOperator
glog
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index d947340..404006f 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -35,6 +35,7 @@
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
#include "storage/TupleReference.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "storage/ValueAccessor.hpp"
@@ -85,7 +86,7 @@
// Consolidation is a no-op for this version, but we provide this trivial
// call so that MapBasedJoinedTupleCollector and
// VectorBasedJoinedTupleCollector have the same interface and can both be
- // used in the templated HashJoinWorkOrder::executeWithCollectorType() method.
+ // used in the templated HashInnerJoinWorkOrder::executeWithCollectorType() method.
inline void consolidate() const {
}
@@ -183,6 +184,57 @@
consolidated_joined_tuples_;
};
+class SemiAntiJoinTupleCollector {
+ public:
+ SemiAntiJoinTupleCollector(const TupleStorageSubBlock &tuple_store) {
+ filter_.reset(tuple_store.getExistenceMap());
+ }
+
+ template <typename ValueAccessorT>
+ inline void operator()(const ValueAccessorT &accessor) {
+ filter_->set(accessor.getCurrentPosition(), false);
+ }
+
+ const TupleIdSequence* filter() const {
+ return filter_.get();
+ }
+
+ private:
+ std::unique_ptr<TupleIdSequence> filter_;
+};
+
+class OuterJoinTupleCollector {
+ public:
+ OuterJoinTupleCollector(const TupleStorageSubBlock &tuple_store) {
+ filter_.reset(tuple_store.getExistenceMap());
+ }
+
+ template <typename ValueAccessorT>
+ inline void operator()(const ValueAccessorT &accessor,
+ const TupleReference &tref) {
+ joined_tuples_[tref.block].emplace_back(tref.tuple, accessor.getCurrentPosition());
+ }
+
+ template <typename ValueAccessorT>
+ inline void recordMatch(const ValueAccessorT &accessor) {
+ filter_->set(accessor.getCurrentPosition(), false);
+ }
+
+ inline std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>>*
+ getJoinedTupleMap() {
+ return &joined_tuples_;
+ }
+
+ const TupleIdSequence* filter() const {
+ return filter_.get();
+ }
+
+ private:
+ std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>> joined_tuples_;
+ // BitVector on the probe relation. 1 if the corresponding tuple has no match.
+ std::unique_ptr<TupleIdSequence> filter_;
+};
+
} // namespace
bool HashJoinOperator::getAllWorkOrders(
@@ -191,31 +243,57 @@
StorageManager *storage_manager,
const tmb::client_id foreman_client_id,
tmb::MessageBus *bus) {
+ switch (join_type_) {
+ case JoinType::kInnerJoin:
+ return getAllNonOuterJoinWorkOrders<HashInnerJoinWorkOrder>(
+ container, query_context, storage_manager);
+ case JoinType::kLeftOuterJoin:
+ return getAllOuterJoinWorkOrders(container, query_context,
+ storage_manager);
+ case JoinType::kLeftSemiJoin:
+ return getAllNonOuterJoinWorkOrders<HashSemiJoinWorkOrder>(
+ container, query_context, storage_manager);
+ case JoinType::kLeftAntiJoin:
+ return getAllNonOuterJoinWorkOrders<HashAntiJoinWorkOrder>(
+ container, query_context, storage_manager);
+ default:
+ LOG(FATAL) << "Unknown join type in HashJoinOperator::getAllWorkOrders()";
+ }
+}
+
+template <class JoinWorkOrderClass>
+bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
+ WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager) {
// We wait until the building of global hash table is complete.
if (blocking_dependencies_met_) {
DCHECK(query_context != nullptr);
- const Predicate *residual_predicate = query_context->getPredicate(residual_predicate_index_);
+ const Predicate *residual_predicate =
+ query_context->getPredicate(residual_predicate_index_);
const vector<unique_ptr<const Scalar>> &selection =
- query_context->getScalarGroup(selection_index_);
+ query_context->getScalarGroup(selection_on_probe_index_);
InsertDestination *output_destination =
query_context->getInsertDestination(output_destination_index_);
- JoinHashTable *hash_table = query_context->getJoinHashTable(hash_table_index_);
+ const JoinHashTable &hash_table =
+ *(query_context->getJoinHashTable(hash_table_index_));
if (probe_relation_is_stored_) {
if (!started_) {
for (const block_id probe_block_id : probe_relation_block_ids_) {
container->addNormalWorkOrder(
- new HashJoinWorkOrder(build_relation_,
- probe_relation_,
- join_key_attributes_,
- any_join_key_attributes_nullable_,
- probe_block_id,
- residual_predicate,
- selection,
- output_destination,
- hash_table,
- storage_manager),
+ new JoinWorkOrderClass(
+ build_relation_,
+ probe_relation_,
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ probe_block_id,
+ selection,
+ hash_table,
+ residual_predicate,
+ output_destination,
+ storage_manager),
op_index_);
}
started_ = true;
@@ -224,27 +302,99 @@
} else {
while (num_workorders_generated_ < probe_relation_block_ids_.size()) {
container->addNormalWorkOrder(
- new HashJoinWorkOrder(
+ new JoinWorkOrderClass(
build_relation_,
probe_relation_,
join_key_attributes_,
any_join_key_attributes_nullable_,
probe_relation_block_ids_[num_workorders_generated_],
- residual_predicate,
selection,
- output_destination,
hash_table,
+ residual_predicate,
+ output_destination,
storage_manager),
op_index_);
++num_workorders_generated_;
- } // end while
+ }
return done_feeding_input_relation_;
- } // end else (input_relation_is_stored is false)
- } // end if (blocking_dependencies_met)
+ } // end else (probe_relation_is_stored_)
+ } // end if (blocking_dependencies_met_)
return false;
}
-void HashJoinWorkOrder::execute() {
+bool HashJoinOperator::getAllOuterJoinWorkOrders(
+ WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager) {
+ // We wait until the building of global hash table is complete.
+ if (blocking_dependencies_met_) {
+ DCHECK(query_context != nullptr);
+
+ const vector<unique_ptr<const Scalar>> &selection_on_probe =
+ query_context->getScalarGroup(selection_on_probe_index_);
+ const vector<unique_ptr<const Scalar>> &selection_on_build =
+ query_context->getScalarGroup(selection_on_build_index_);
+ InsertDestination *output_destination =
+ query_context->getInsertDestination(output_destination_index_);
+ const JoinHashTable &hash_table =
+ *(query_context->getJoinHashTable(hash_table_index_));
+
+ // TODO(harshad, jianqiao) Construct the vector below in ExecutionGenerator
+ // and pass it as an argument to the HashJoinOperator.
+ std::vector<const Type*> selection_on_build_types;
+ for (auto selection_on_build_it = selection_on_build.begin();
+ selection_on_build_it != selection_on_build.end();
+ ++selection_on_build_it) {
+ selection_on_build_types.emplace_back(
+ (&(*selection_on_build_it)->getType().getNullableVersion()));
+ }
+
+ if (probe_relation_is_stored_) {
+ if (!started_) {
+ for (const block_id probe_block_id : probe_relation_block_ids_) {
+ container->addNormalWorkOrder(
+ new HashOuterJoinWorkOrder(
+ build_relation_,
+ probe_relation_,
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ hash_table,
+ selection_on_probe,
+ selection_on_build,
+ selection_on_build_types,
+ probe_block_id,
+ output_destination,
+ storage_manager),
+ op_index_);
+ }
+ started_ = true;
+ }
+ return started_;
+ } else {
+ while (num_workorders_generated_ < probe_relation_block_ids_.size()) {
+ container->addNormalWorkOrder(
+ new HashOuterJoinWorkOrder(
+ build_relation_,
+ probe_relation_,
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ hash_table,
+ selection_on_probe,
+ selection_on_build,
+ selection_on_build_types,
+ probe_relation_block_ids_[num_workorders_generated_],
+ output_destination,
+ storage_manager),
+ op_index_);
+ ++num_workorders_generated_;
+ }
+ return done_feeding_input_relation_;
+ } // end else (probe_relation_is_stored_)
+ } // end if (blocking_dependencies_met_)
+ return false;
+}
+
+void HashInnerJoinWorkOrder::execute() {
if (FLAGS_vector_based_joined_tuple_collector) {
executeWithCollectorType<VectorBasedJoinedTupleCollector>();
} else {
@@ -253,7 +403,7 @@
}
template <typename CollectorT>
-void HashJoinWorkOrder::executeWithCollectorType() {
+void HashInnerJoinWorkOrder::executeWithCollectorType() {
BlockReference probe_block(
storage_manager_->getBlock(block_id_, probe_relation_));
const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
@@ -261,13 +411,13 @@
std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
CollectorT collector;
if (join_key_attributes_.size() == 1) {
- hash_table_->getAllFromValueAccessor(
+ hash_table_.getAllFromValueAccessor(
probe_accessor.get(),
join_key_attributes_.front(),
any_join_key_attributes_nullable_,
&collector);
} else {
- hash_table_->getAllFromValueAccessorCompositeKey(
+ hash_table_.getAllFromValueAccessorCompositeKey(
probe_accessor.get(),
join_key_attributes_,
any_join_key_attributes_nullable_,
@@ -348,4 +498,359 @@
}
}
+void HashSemiJoinWorkOrder::execute() {
+ if (residual_predicate_ == nullptr) {
+ executeWithoutResidualPredicate();
+ } else {
+ executeWithResidualPredicate();
+ }
+}
+
+void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
+ const relation_id build_relation_id = build_relation_.getID();
+ const relation_id probe_relation_id = probe_relation_.getID();
+
+ BlockReference probe_block = storage_manager_->getBlock(block_id_,
+ probe_relation_);
+ const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+ std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+
+ // TODO(harshad) - Make this function work with both types of collectors.
+
+ // We collect all the matching probe relation tuples, as there's a residual
+ // preidcate that needs to be applied after collecting these matches.
+ MapBasedJoinedTupleCollector collector;
+ if (join_key_attributes_.size() == 1) {
+ hash_table_.getAllFromValueAccessor(
+ probe_accessor.get(),
+ join_key_attributes_.front(),
+ any_join_key_attributes_nullable_,
+ &collector);
+ } else {
+ hash_table_.getAllFromValueAccessorCompositeKey(
+ probe_accessor.get(),
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ &collector);
+ }
+
+ // Get a filter for tuples in the given probe block.
+ TupleIdSequence filter(probe_store.getMaxTupleID() + 1);
+ filter.setRange(0, filter.length(), false);
+ for (const std::pair<const block_id,
+ std::vector<std::pair<tuple_id, tuple_id>>>
+ &build_block_entry : *collector.getJoinedTuples()) {
+ // First element of the pair build_block_entry is the build block ID
+ // 2nd element of the pair is a vector of pairs, in each of which -
+ // 1st element is a matching tuple ID from the inner (build) relation.
+ // 2nd element is a matching tuple ID from the outer (probe) relation.
+
+ // Get the block from the build relation for this pair of matched tuples.
+ BlockReference build_block =
+ storage_manager_->getBlock(build_block_entry.first, build_relation_);
+ const TupleStorageSubBlock &build_store =
+ build_block->getTupleStorageSubBlock();
+ std::unique_ptr<ValueAccessor> build_accessor(
+ build_store.createValueAccessor());
+ for (const std::pair<tuple_id, tuple_id> &hash_match
+ : build_block_entry.second) {
+ // For each pair, 1st element is a tuple ID from the build relation in the
+ // given build block, 2nd element is a tuple ID from the probe relation.
+ if (filter.get(hash_match.second)) {
+ // We have already found matches for this tuple that belongs to the
+ // probe side, skip it.
+ continue;
+ }
+ if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
+ build_relation_id,
+ hash_match.first,
+ *probe_accessor,
+ probe_relation_id,
+ hash_match.second)) {
+ filter.set(hash_match.second, true);
+ }
+ }
+ }
+
+ SubBlocksReference sub_blocks_ref(probe_store,
+ probe_block->getIndices(),
+ probe_block->getIndicesConsistent());
+
+ std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+ probe_store.createValueAccessor(&filter));
+ ColumnVectorsValueAccessor temp_result;
+ for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+ selection_it != selection_.end();
+ ++selection_it) {
+ temp_result.addColumn((*selection_it)->getAllValues(
+ probe_accessor_with_filter.get(), &sub_blocks_ref));
+ }
+
+ output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
+ DCHECK(residual_predicate_ == nullptr);
+
+ BlockReference probe_block = storage_manager_->getBlock(block_id_,
+ probe_relation_);
+ const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+ std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+ SemiAntiJoinTupleCollector collector(probe_store);
+ // We collect all the probe relation tuples which have at least one matching
+ // tuple in the build relation. As a performance optimization, the hash table
+ // just looks for the existence of the probing key in the hash table and sets
+ // the bit for the probing key in the collector. The optimization works
+ // because there is no residual predicate in this case, unlike
+ // executeWithResidualPredicate().
+ if (join_key_attributes_.size() == 1u) {
+ // Call the collector to set the bit to 0 for every key without a match.
+ hash_table_.runOverKeysFromValueAccessorIfMatchNotFound(
+ probe_accessor.get(),
+ join_key_attributes_.front(),
+ any_join_key_attributes_nullable_,
+ &collector);
+ } else {
+ // Call the collector to set the bit to 0 for every key without a match.
+ hash_table_.runOverKeysFromValueAccessorIfMatchNotFoundCompositeKey(
+ probe_accessor.get(),
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ &collector);
+ }
+
+ SubBlocksReference sub_blocks_ref(probe_store,
+ probe_block->getIndices(),
+ probe_block->getIndicesConsistent());
+
+ std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+ probe_store.createValueAccessor(collector.filter()));
+ ColumnVectorsValueAccessor temp_result;
+ for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+ selection_it != selection_.end(); ++selection_it) {
+ temp_result.addColumn((*selection_it)->getAllValues(
+ probe_accessor_with_filter.get(), &sub_blocks_ref));
+ }
+
+ output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
+ DEBUG_ASSERT(residual_predicate_ == nullptr);
+
+ BlockReference probe_block = storage_manager_->getBlock(block_id_,
+ probe_relation_);
+ const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+ std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+ SemiAntiJoinTupleCollector collector(probe_store);
+ // We probe the hash table to find the keys which have an entry in the
+ // hash table.
+ if (join_key_attributes_.size() == 1) {
+ // Call the collector to set the bit to 0 for every key with a match.
+ hash_table_.runOverKeysFromValueAccessorIfMatchFound(
+ probe_accessor.get(),
+ join_key_attributes_.front(),
+ any_join_key_attributes_nullable_,
+ &collector);
+ } else {
+ // Call the collector to set the bit to 0 for every key with a match.
+ hash_table_.runOverKeysFromValueAccessorIfMatchFoundCompositeKey(
+ probe_accessor.get(),
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ &collector);
+ }
+
+ SubBlocksReference sub_blocks_ref(probe_store,
+ probe_block->getIndices(),
+ probe_block->getIndicesConsistent());
+
+ std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+ probe_store.createValueAccessor(collector.filter()));
+ ColumnVectorsValueAccessor temp_result;
+ for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+ selection_it != selection_.end(); ++selection_it) {
+ temp_result.addColumn((*selection_it)->getAllValues(
+ probe_accessor_with_filter.get(), &sub_blocks_ref));
+ }
+
+ output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
+ const relation_id build_relation_id = build_relation_.getID();
+ const relation_id probe_relation_id = probe_relation_.getID();
+
+ BlockReference probe_block = storage_manager_->getBlock(block_id_,
+ probe_relation_);
+ const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+ std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+ // TODO(harshad) - Make the following code work with both types of collectors.
+ MapBasedJoinedTupleCollector collector;
+ // We probe the hash table and get all the matches. Unlike
+ // executeWithoutResidualPredicate(), we have to collect all the matching
+ // tuples, because after this step we still have to evalute the residual
+ // predicate.
+ if (join_key_attributes_.size() == 1) {
+ hash_table_.getAllFromValueAccessor(
+ probe_accessor.get(),
+ join_key_attributes_.front(),
+ any_join_key_attributes_nullable_,
+ &collector);
+ } else {
+ hash_table_.getAllFromValueAccessorCompositeKey(
+ probe_accessor.get(),
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ &collector);
+ }
+
+ // Create a filter for all the tuples from the given probe block.
+ std::unique_ptr<TupleIdSequence> filter(probe_store.getExistenceMap());
+ for (const std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
+ &build_block_entry : *collector.getJoinedTuples()) {
+ // First element of the pair build_block_entry is the build block ID
+ // 2nd element of the pair is a vector of pairs, in each of which -
+ // 1st element is a matching tuple ID from the inner (build) relation.
+ // 2nd element is a matching tuple ID from the outer (probe) relation.
+ BlockReference build_block = storage_manager_->getBlock(build_block_entry.first,
+ build_relation_);
+ const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
+ std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
+ for (const std::pair<tuple_id, tuple_id> &hash_match
+ : build_block_entry.second) {
+ if (!filter->get(hash_match.second)) {
+ // We have already seen this tuple, skip it.
+ continue;
+ }
+ if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
+ build_relation_id,
+ hash_match.first,
+ *probe_accessor,
+ probe_relation_id,
+ hash_match.second)) {
+ // Note that the filter marks a match as false, as needed by the anti
+ // join definition.
+ filter->set(hash_match.second, false);
+ }
+ }
+ }
+
+ SubBlocksReference sub_blocks_ref(probe_store,
+ probe_block->getIndices(),
+ probe_block->getIndicesConsistent());
+
+ std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+ probe_store.createValueAccessor(filter.get()));
+ ColumnVectorsValueAccessor temp_result;
+ for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+ selection_it != selection_.end();
+ ++selection_it) {
+ temp_result.addColumn((*selection_it)->getAllValues(probe_accessor_with_filter.get(),
+ &sub_blocks_ref));
+ }
+
+ output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashOuterJoinWorkOrder::execute() {
+ const relation_id build_relation_id = build_relation_.getID();
+ const relation_id probe_relation_id = probe_relation_.getID();
+
+ BlockReference probe_block = storage_manager_->getBlock(block_id_,
+ probe_relation_);
+ const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+ std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+ OuterJoinTupleCollector collector(probe_store);
+ if (join_key_attributes_.size() == 1) {
+ hash_table_.getAllFromValueAccessorWithExtraWorkForFirstMatch(
+ probe_accessor.get(),
+ join_key_attributes_.front(),
+ any_join_key_attributes_nullable_,
+ &collector);
+ } else {
+ hash_table_.getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch(
+ probe_accessor.get(),
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ &collector);
+ }
+
+ for (const std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
+ &build_block_entry : *collector.getJoinedTupleMap()) {
+ BlockReference build_block =
+ storage_manager_->getBlock(build_block_entry.first, build_relation_);
+ const TupleStorageSubBlock &build_store =
+ build_block->getTupleStorageSubBlock();
+
+ std::unique_ptr<ValueAccessor> build_accessor(
+ build_store.createValueAccessor());
+ ColumnVectorsValueAccessor temp_result;
+ for (auto selection_it = selection_on_probe_.begin();
+ selection_it != selection_on_probe_.end();
+ ++selection_it) {
+ temp_result.addColumn(
+ (*selection_it)->getAllValuesForJoin(
+ build_relation_id,
+ build_accessor.get(),
+ probe_relation_id,
+ probe_accessor.get(),
+ build_block_entry.second));
+ }
+ for (auto selection_it = selection_on_build_.begin();
+ selection_it != selection_on_build_.end();
+ ++selection_it) {
+ temp_result.addColumn(
+ (*selection_it)->getAllValuesForJoin(
+ build_relation_id,
+ build_accessor.get(),
+ probe_relation_id,
+ probe_accessor.get(),
+ build_block_entry.second));
+ }
+
+ output_destination_->bulkInsertTuples(&temp_result);
+ }
+
+ SubBlocksReference sub_blocks_ref(probe_store,
+ probe_block->getIndices(),
+ probe_block->getIndicesConsistent());
+
+ // Populate the output tuples for non-matches.
+ const TupleIdSequence *filter = collector.filter();
+ const TupleIdSequence::size_type num_tuples_without_matches = filter->size();
+ if (num_tuples_without_matches > 0) {
+ std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+ probe_store.createValueAccessor(filter));
+ ColumnVectorsValueAccessor temp_result;
+ for (auto selection_it = selection_on_probe_.begin();
+ selection_it != selection_on_probe_.end();
+ ++selection_it) {
+ temp_result.addColumn(
+ (*selection_it)->getAllValues(probe_accessor_with_filter.get(),
+ &sub_blocks_ref));
+ }
+
+ for (const Type *selection_on_build_type : selection_on_build_types_) {
+ if (NativeColumnVector::UsableForType(*selection_on_build_type)) {
+ NativeColumnVector *result = new NativeColumnVector(
+ *selection_on_build_type, num_tuples_without_matches);
+ result->fillWithNulls();
+ temp_result.addColumn(result);
+ } else {
+ IndirectColumnVector *result = new IndirectColumnVector(
+ *selection_on_build_type, num_tuples_without_matches);
+ result->fillWithValue(TypedValue(selection_on_build_type->getTypeID()));
+ temp_result.addColumn(result);
+ }
+ }
+ output_destination_->bulkInsertTuples(&temp_result);
+ }
+}
+
} // namespace quickstep
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 9b8c87b..4141e14 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -30,6 +30,7 @@
#include "storage/HashTable.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/PtrList.hpp"
#include "glog/logging.h"
@@ -51,11 +52,18 @@
*/
/**
- * @brief An operator which performs a hash-join on two
- * relations.
+ * @brief An operator which performs a hash-join, including inner-join,
+ * semi-join, anti-join and outer-join on two relations.
**/
class HashJoinOperator : public RelationalOperator {
public:
+ enum class JoinType {
+ kInnerJoin = 0,
+ kLeftOuterJoin,
+ kLeftSemiJoin,
+ kLeftAntiJoin
+ };
+
/**
* @brief Constructor.
*
@@ -93,35 +101,48 @@
* additional filter to pairs of tuples that match the hash-join (i.e.
* key equality) predicate. Effectively, this makes the join predicate
* the conjunction of the key-equality predicate and residual predicate.
- * @param selection_index The group index of Scalars in QueryContext,
- * corresponding to the attributes of the relation referred by
+ * Note that this field is not relevant for anti-join.
+ * @param selection_on_probe_index The group index of Scalars in QueryContext,
+ * corresponding to the attributes of the probe relation referred by
* output_relation_id. Each Scalar is evaluated for the joined tuples,
* and the resulting value is inserted into the join result.
+ * @param selection_on_build_index The group index of Scalars in QueryContext,
+ * corresponding to the attributes of the build relation referred by
+ * output_relation_id. Each Scalar is evaluated for the joined tuples,
+ * and the resulting value is inserted into the join result.
+ * @param join_type The type of join corresponding to this operator.
**/
- HashJoinOperator(const CatalogRelation &build_relation,
- const CatalogRelation &probe_relation,
- const bool probe_relation_is_stored,
- const std::vector<attribute_id> &join_key_attributes,
- const bool any_join_key_attributes_nullable,
- const CatalogRelation &output_relation,
- const QueryContext::insert_destination_id output_destination_index,
- const QueryContext::join_hash_table_id hash_table_index,
- const QueryContext::predicate_id residual_predicate_index,
- const QueryContext::scalar_group_id selection_index)
- : build_relation_(build_relation),
- probe_relation_(probe_relation),
- probe_relation_is_stored_(probe_relation_is_stored),
- join_key_attributes_(join_key_attributes),
- any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
- output_relation_(output_relation),
- output_destination_index_(output_destination_index),
- hash_table_index_(hash_table_index),
- residual_predicate_index_(residual_predicate_index),
- selection_index_(selection_index),
- probe_relation_block_ids_(probe_relation_is_stored ? probe_relation.getBlocksSnapshot()
- : std::vector<block_id>()),
- num_workorders_generated_(0),
- started_(false) {}
+ HashJoinOperator(
+ const CatalogRelation &build_relation,
+ const CatalogRelation &probe_relation,
+ const bool probe_relation_is_stored,
+ const std::vector<attribute_id> &join_key_attributes,
+ const bool any_join_key_attributes_nullable,
+ const CatalogRelation &output_relation,
+ const QueryContext::insert_destination_id output_destination_index,
+ const QueryContext::join_hash_table_id hash_table_index,
+ const QueryContext::predicate_id residual_predicate_index,
+ const QueryContext::scalar_group_id selection_on_probe_index,
+ const QueryContext::scalar_group_id selection_on_build_index =
+ QueryContext::kInvalidScalarGroupId,
+ const JoinType join_type = JoinType::kInnerJoin)
+ : build_relation_(build_relation),
+ probe_relation_(probe_relation),
+ probe_relation_is_stored_(probe_relation_is_stored),
+ join_key_attributes_(join_key_attributes),
+ any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+ output_relation_(output_relation),
+ output_destination_index_(output_destination_index),
+ hash_table_index_(hash_table_index),
+ residual_predicate_index_(residual_predicate_index),
+ selection_on_probe_index_(selection_on_probe_index),
+ selection_on_build_index_(selection_on_build_index),
+ join_type_(join_type),
+ probe_relation_block_ids_(probe_relation_is_stored
+ ? probe_relation.getBlocksSnapshot()
+ : std::vector<block_id>()),
+ num_workorders_generated_(0),
+ started_(false) {}
~HashJoinOperator() override {}
@@ -163,6 +184,15 @@
}
private:
+ template <class JoinWorkOrderClass>
+ bool getAllNonOuterJoinWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager);
+
+ bool getAllOuterJoinWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager);
+
const CatalogRelation &build_relation_;
const CatalogRelation &probe_relation_;
const bool probe_relation_is_stored_;
@@ -172,7 +202,9 @@
const QueryContext::insert_destination_id output_destination_index_;
const QueryContext::join_hash_table_id hash_table_index_;
const QueryContext::predicate_id residual_predicate_index_;
- const QueryContext::scalar_group_id selection_index_;
+ const QueryContext::scalar_group_id selection_on_probe_index_;
+ const QueryContext::scalar_group_id selection_on_build_index_;
+ const JoinType join_type_;
std::vector<block_id> probe_relation_block_ids_;
std::size_t num_workorders_generated_;
@@ -183,9 +215,9 @@
};
/**
- * @brief A WorkOrder produced by HashJoinOperator.
+ * @brief An inner join WorkOrder produced by HashJoinOperator.
**/
-class HashJoinWorkOrder : public WorkOrder {
+class HashInnerJoinWorkOrder : public WorkOrder {
public:
/**
* @brief Constructor.
@@ -198,26 +230,26 @@
* probe_relation.
* @param any_join_key_attributes_nullable If any attribute is nullable.
* @param lookup_block_id The block id of the probe_relation.
+ * @param selection A list of Scalars corresponding to the relation attributes
+ * in \c output_destination. Each Scalar is evaluated for the joined
+ * tuples, and the resulting value is inserted into the join result.
+ * @param hash_table The JoinHashTable to use.
* @param residual_predicate If non-null, apply as an additional filter to
* pairs of tuples that match the hash-join (i.e. key equality)
* predicate. Effectively, this makes the join predicate the
* conjunction of the key-equality predicate and residual_predicate.
- * @param selection A list of Scalars corresponding to the relation attributes
- * in \c output_destination. Each Scalar is evaluated for the joined
- * tuples, and the resulting value is inserted into the join result.
* @param output_destination The InsertDestination to insert the join results.
- * @param hash_table The JoinHashTable to use.
* @param storage_manager The StorageManager to use.
**/
- HashJoinWorkOrder(const CatalogRelationSchema &build_relation,
+ HashInnerJoinWorkOrder(const CatalogRelationSchema &build_relation,
const CatalogRelationSchema &probe_relation,
const std::vector<attribute_id> &join_key_attributes,
const bool any_join_key_attributes_nullable,
const block_id lookup_block_id,
- const Predicate *residual_predicate,
const std::vector<std::unique_ptr<const Scalar>> &selection,
+ const JoinHashTable &hash_table,
+ const Predicate *residual_predicate,
InsertDestination *output_destination,
- JoinHashTable *hash_table,
StorageManager *storage_manager)
: build_relation_(build_relation),
probe_relation_(probe_relation),
@@ -226,11 +258,11 @@
block_id_(lookup_block_id),
residual_predicate_(residual_predicate),
selection_(selection),
+ hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- hash_table_(DCHECK_NOTNULL(hash_table)),
storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
- ~HashJoinWorkOrder() override {}
+ ~HashInnerJoinWorkOrder() override {}
/**
* @exception TupleTooLargeForBlock A tuple produced by this join was too
@@ -253,11 +285,233 @@
const Predicate *residual_predicate_;
const std::vector<std::unique_ptr<const Scalar>> &selection_;
+ const JoinHashTable &hash_table_;
InsertDestination *output_destination_;
- JoinHashTable *hash_table_;
StorageManager *storage_manager_;
- DISALLOW_COPY_AND_ASSIGN(HashJoinWorkOrder);
+ DISALLOW_COPY_AND_ASSIGN(HashInnerJoinWorkOrder);
+};
+
+/**
+ * @brief A left semi-join WorkOrder produced by the HashJoinOperator to execute
+ * EXISTS() clause.
+ **/
+class HashSemiJoinWorkOrder : public WorkOrder {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param build_relation The relation that the hash table was originally built
+ * on (i.e. the inner relation in the join).
+ * @param probe_relation The relation to probe the hash table with (i.e. the
+ * outer relation in the join).
+ * @param join_key_attributes The IDs of equijoin attributes in \c
+ * probe_relation.
+ * @param any_join_key_attributes_nullable If any attribute is nullable.
+ * @param lookup_block_id The block id of the probe_relation.
+ * @param selection A list of Scalars corresponding to the relation attributes
+ * in \c output_destination. Each Scalar is evaluated for the joined
+ * tuples, and the resulting value is inserted into the join result.
+ * @param hash_table The JoinHashTable to use.
+ * @param residual_predicate If non-null, apply as an additional filter to
+ * pairs of tuples that match the hash-join (i.e. key equality)
+ * predicate. Effectively, this makes the join predicate the
+ * conjunction of the key-equality predicate and residual_predicate.
+ * @param output_destination The InsertDestination to insert the join results.
+ * @param storage_manager The StorageManager to use.
+ **/
+ HashSemiJoinWorkOrder(const CatalogRelationSchema &build_relation,
+ const CatalogRelationSchema &probe_relation,
+ const std::vector<attribute_id> &join_key_attributes,
+ const bool any_join_key_attributes_nullable,
+ const block_id lookup_block_id,
+ const std::vector<std::unique_ptr<const Scalar>> &selection,
+ const JoinHashTable &hash_table,
+ const Predicate *residual_predicate,
+ InsertDestination *output_destination,
+ StorageManager *storage_manager)
+ : build_relation_(build_relation),
+ probe_relation_(probe_relation),
+ join_key_attributes_(join_key_attributes),
+ any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+ hash_table_(hash_table),
+ selection_(selection),
+ residual_predicate_(residual_predicate),
+ block_id_(lookup_block_id),
+ output_destination_(output_destination),
+ storage_manager_(storage_manager) {}
+
+ ~HashSemiJoinWorkOrder() override {}
+
+ void execute() override;
+
+ private:
+ void executeWithoutResidualPredicate();
+
+ void executeWithResidualPredicate();
+
+ const CatalogRelationSchema &build_relation_;
+ const CatalogRelationSchema &probe_relation_;
+ const std::vector<attribute_id> join_key_attributes_;
+ const bool any_join_key_attributes_nullable_;
+ const JoinHashTable &hash_table_;
+ const std::vector<std::unique_ptr<const Scalar>> &selection_;
+ const Predicate *residual_predicate_;
+ const block_id block_id_;
+
+ InsertDestination *output_destination_;
+ StorageManager *storage_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(HashSemiJoinWorkOrder);
+};
+
+/**
+ * @brief A left anti-join WorkOrder produced by the HashJoinOperator to execute
+ * NOT EXISTS() clause.
+ **/
+class HashAntiJoinWorkOrder : public WorkOrder {
+ public:
+ /**
+ * @brief Constructor.
+ * TODO(harshad) - Sync the doxygen.
+ *
+ * @param build_relation The relation that the hash table was originally built
+ * on (i.e. the inner relation in the join).
+ * @param probe_relation The relation to probe the hash table with (i.e. the
+ * outer relation in the join).
+ * @param join_key_attributes The IDs of equijoin attributes in \c
+ * probe_relation.
+ * @param any_join_key_attributes_nullable If any attribute is nullable.
+ * @param lookup_block_id The block id of the probe_relation.
+ * @param selection A list of Scalars corresponding to the relation attributes
+ * in \c output_destination. Each Scalar is evaluated for the joined
+ * tuples, and the resulting value is inserted into the join result.
+ * @param hash_table The JoinHashTable to use.
+ * @param residual_predicate If non-null, apply as an additional filter to
+ * pairs of tuples that match the hash-join (i.e. key equality)
+ * predicate. Effectively, this makes the join predicate the
+ * conjunction of the key-equality predicate and residual_predicate.
+ * @param output_destination The InsertDestination to insert the join results.
+ * @param storage_manager The StorageManager to use.
+ **/
+ HashAntiJoinWorkOrder(const CatalogRelationSchema &build_relation,
+ const CatalogRelationSchema &probe_relation,
+ const std::vector<attribute_id> &join_key_attributes,
+ const bool any_join_key_attributes_nullable,
+ const block_id lookup_block_id,
+ const std::vector<std::unique_ptr<const Scalar>> &selection,
+ const JoinHashTable &hash_table,
+ const Predicate *residual_predicate,
+ InsertDestination *output_destination,
+ StorageManager *storage_manager)
+ : build_relation_(build_relation),
+ probe_relation_(probe_relation),
+ join_key_attributes_(join_key_attributes),
+ any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+ selection_(selection),
+ residual_predicate_(residual_predicate),
+ block_id_(lookup_block_id),
+ hash_table_(hash_table),
+ output_destination_(output_destination),
+ storage_manager_(storage_manager) {}
+
+ ~HashAntiJoinWorkOrder() override {}
+
+ void execute() override {
+ if (residual_predicate_ == nullptr) {
+ executeWithoutResidualPredicate();
+ } else {
+ executeWithResidualPredicate();
+ }
+ }
+
+ private:
+ void executeWithoutResidualPredicate();
+
+ void executeWithResidualPredicate();
+
+ const CatalogRelationSchema &build_relation_;
+ const CatalogRelationSchema &probe_relation_;
+ const std::vector<attribute_id> join_key_attributes_;
+ const bool any_join_key_attributes_nullable_;
+ const std::vector<std::unique_ptr<const Scalar>> &selection_;
+ const Predicate *residual_predicate_;
+ const block_id block_id_;
+ const JoinHashTable &hash_table_;
+
+ InsertDestination *output_destination_;
+ StorageManager *storage_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(HashAntiJoinWorkOrder);
+};
+
+/**
+ * @brief A left outer join WorkOrder produced by the HashJoinOperator.
+ **/
+ class HashOuterJoinWorkOrder : public WorkOrder {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param build_relation The relation that the hash table was originally built
+ * on (i.e. the inner relation in the join).
+ * @param probe_relation The relation to probe the hash table with (i.e. the
+ * outer relation in the join).
+ * @param join_key_attributes The IDs of equijoin attributes in \c
+ * probe_relation.
+ * @param any_join_key_attributes_nullable If any attribute is nullable.
+ * @param hash_table The JoinHashTable to use.
+ * @param selection_on_probe A list of Scalars from probe relation,
+ * corresponding to the relation attributes in \c output_destination.
+ * @param selection_on_build A list of Scalars from build relation,
+ * corresponding to the relation attributes in \c output_destination.
+ * @param lookup_block_id The block id of the probe_relation.
+ * @param output_destination The InsertDestination to insert the join results.
+ * @param storage_manager The StorageManager to use.
+ **/
+ HashOuterJoinWorkOrder(const CatalogRelationSchema &build_relation,
+ const CatalogRelationSchema &probe_relation,
+ const std::vector<attribute_id> &join_key_attributes,
+ const bool any_join_key_attributes_nullable,
+ const JoinHashTable &hash_table,
+ const std::vector<std::unique_ptr<const Scalar>> &selection_on_probe,
+ const std::vector<std::unique_ptr<const Scalar>> &selection_on_build,
+ const std::vector<const Type*> &selection_on_build_types,
+ const block_id lookup_block_id,
+ InsertDestination *output_destination,
+ StorageManager *storage_manager)
+ : build_relation_(build_relation),
+ probe_relation_(probe_relation),
+ join_key_attributes_(join_key_attributes),
+ any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+ hash_table_(hash_table),
+ selection_on_probe_(selection_on_probe),
+ selection_on_build_(selection_on_build),
+ selection_on_build_types_(selection_on_build_types),
+ block_id_(lookup_block_id),
+ output_destination_(output_destination),
+ storage_manager_(storage_manager) {}
+
+ ~HashOuterJoinWorkOrder() override {}
+
+ void execute() override;
+
+ private:
+ const CatalogRelationSchema &build_relation_;
+ const CatalogRelationSchema &probe_relation_;
+ const std::vector<attribute_id> join_key_attributes_;
+ const bool any_join_key_attributes_nullable_;
+ const JoinHashTable &hash_table_;
+ const std::vector<std::unique_ptr<const Scalar>> &selection_on_probe_;
+ const std::vector<std::unique_ptr<const Scalar>> &selection_on_build_;
+ const std::vector<const Type*> &selection_on_build_types_;
+
+ const block_id block_id_;
+
+ InsertDestination *output_destination_;
+ StorageManager *storage_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(HashOuterJoinWorkOrder);
};
/** @} */
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index ab74e2c..0178970 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -707,6 +707,89 @@
FunctorT *functor) const;
/**
+ * @brief Lookup (multiple) keys from a ValueAccessor, apply a functor to
+ * the matching values and additionally call a hasMatch() function of
+ * the functor when the first match for a key is found.
+ * @warning This method assumes that no concurrent calls to put(),
+ * putCompositeKey(), putValueAccessor(),
+ * putValueAccessorCompositeKey(), upsert(), upsertCompositeKey(),
+ * upsertValueAccessor(), or upsertValueAccessorCompositeKey() are
+ * taking place (i.e. that this HashTable is immutable for the
+ * duration of the call and as long as the returned pointer may be
+ * dereferenced). Concurrent calls to getSingle(),
+ * getSingleCompositeKey(), getAll(), getAllCompositeKey(),
+ * getAllFromValueAccessor(), getAllFromValueAccessorCompositeKey(),
+ * forEach(), and forEachCompositeKey() are safe.
+ * @note This version is for single scalar keys. See also
+ * getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch().
+ *
+ * @param accessor A ValueAccessor which will be used to access keys.
+ * beginIteration() should be called on accessor before calling this
+ * method.
+ * @param key_attr_id The attribute ID of the keys to be read from accessor.
+ * @param check_for_null_keys If true, each key will be checked to see if it
+ * is null before looking it up (null keys are skipped). This must be
+ * set to true if some of the keys that will be read from accessor may
+ * be null.
+ * @param functor A pointer to a functor, which should provide two functions:
+ * 1) An operator that takes 2 arguments: const ValueAccessor& (or better
+ * yet, a templated call operator which takes a const reference to
+ * some subclass of ValueAccessor as its first argument) and
+ * const ValueT&. The operator will be invoked once for each pair of a
+ * key taken from accessor and matching value.
+ * 2) A function hasMatch that takes 1 argument: const ValueAccessor&.
+ * The function will be called only once for a key from accessor when
+ * the first match is found.
+ */
+ template <typename FunctorT>
+ void getAllFromValueAccessorWithExtraWorkForFirstMatch(
+ ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const bool check_for_null_keys,
+ FunctorT *functor) const;
+
+ /**
+ * @brief Lookup (multiple) keys from a ValueAccessor, apply a functor to
+ * the matching values and additionally call a hasMatch() function of
+ * the functor when the first match for a key is found. Composite key
+ * version.
+ * @warning This method assumes that no concurrent calls to put(),
+ * putCompositeKey(), putValueAccessor(),
+ * putValueAccessorCompositeKey(), upsert(), upsertCompositeKey(),
+ * upsertValueAccessor(), or upsertValueAccessorCompositeKey() are
+ * taking place (i.e. that this HashTable is immutable for the
+ * duration of the call and as long as the returned pointer may be
+ * dereferenced). Concurrent calls to getSingle(),
+ * getSingleCompositeKey(), getAll(), getAllCompositeKey(),
+ * getAllFromValueAccessor(), getAllFromValueAccessorCompositeKey(),
+ * forEach(), and forEachCompositeKey() are safe.
+ *
+ * @param accessor A ValueAccessor which will be used to access keys.
+ * beginIteration() should be called on accessor before calling this
+ * method.
+ * @param key_attr_id The attribute ID of the keys to be read from accessor.
+ * @param check_for_null_keys If true, each key will be checked to see if it
+ * is null before looking it up (null keys are skipped). This must be
+ * set to true if some of the keys that will be read from accessor may
+ * be null.
+ * @param functor A pointer to a functor, which should provide two functions:
+ * 1) An operator that takes 2 arguments: const ValueAccessor& (or better
+ * yet, a templated call operator which takes a const reference to
+ * some subclass of ValueAccessor as its first argument) and
+ * const ValueT&. The operator will be invoked once for each pair of a
+ * key taken from accessor and matching value.
+ * 2) A function hasMatch that takes 1 argument: const ValueAccessor&.
+ * The function will be called only once for a key from accessor when
+ * the first match is found.
+ */
+ template <typename FunctorT>
+ void getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys,
+ FunctorT *functor) const;
+
+ /**
* @brief Lookup (multiple) keys from a ValueAccessor and apply a functor to
* the matching values. Composite key version.
*
@@ -746,6 +829,113 @@
FunctorT *functor) const;
/**
+ * @brief Apply the functor to each key with a match in the hash table.
+ *
+ * @param accessor A ValueAccessor which will be used to access keys.
+ * beginIteration() should be called on accessor before calling this
+ * method.
+ * @param key_attr_id The attribute ID of the keys to be read from accessor.
+ * @param check_for_null_keys If true, each key will be checked to see if it
+ * is null before looking it up (null keys are skipped). This must be
+ * set to true if some of the keys that will be read from accessor may
+ * be null.
+ * @param functor A pointer to a functor which should provide an operator that
+ * takes 1 argument: const ValueAccessor&. The operator will be called
+ * only once for a key from accessor if there is a match.
+ */
+ template <typename FunctorT>
+ void runOverKeysFromValueAccessorIfMatchFound(ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ return runOverKeysFromValueAccessor<true>(accessor,
+ key_attr_id,
+ check_for_null_keys,
+ functor);
+ }
+
+ /**
+ * @brief Apply the functor to each key with a match in the hash table.
+ *
+ * @param accessor A ValueAccessor which will be used to access keys.
+ * beginIteration() should be called on accessor before calling this
+ * method.
+ * @param key_attr_id The attribute ID of the keys to be read from accessor.
+ * @param check_for_null_keys If true, each key will be checked to see if it
+ * is null before looking it up (null keys are skipped). This must be
+ * set to true if some of the keys that will be read from accessor may
+ * be null.
+ * @param functor A pointer to a functor which should provide an operator that
+ * takes 1 argument: const ValueAccessor&. The operator will be called
+ * only once for a key from accessor if there is a match.
+ */
+ template <typename FunctorT>
+ void runOverKeysFromValueAccessorIfMatchFoundCompositeKey(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ return runOverKeysFromValueAccessorCompositeKey<true>(accessor,
+ key_attr_ids,
+ check_for_null_keys,
+ functor);
+ }
+
+ /**
+ * @brief Apply the functor to each key without a match in the hash table.
+ *
+ * @param accessor A ValueAccessor which will be used to access keys.
+ * beginIteration() should be called on accessor before calling this
+ * method.
+ * @param key_attr_id The attribute ID of the keys to be read from accessor.
+ * @param check_for_null_keys If true, each key will be checked to see if it
+ * is null before looking it up (null keys are skipped). This must be
+ * set to true if some of the keys that will be read from accessor may
+ * be null.
+ * @param functor A pointer to a functor which should provide an operator that
+ * takes 1 argument: const ValueAccessor&. The operator will be called
+ * only once for a key from accessor if there is no match.
+ */
+ template <typename FunctorT>
+ void runOverKeysFromValueAccessorIfMatchNotFound(
+ ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ return runOverKeysFromValueAccessor<false>(accessor,
+ key_attr_id,
+ check_for_null_keys,
+ functor);
+ }
+
+ /**
+ * @brief Apply the functor to each key without a match in the hash table.
+ *
+ * @param accessor A ValueAccessor which will be used to access keys.
+ * beginIteration() should be called on accessor before calling this
+ * method.
+ * @param key_attr_id The attribute ID of the keys to be read from accessor.
+ * @param check_for_null_keys If true, each key will be checked to see if it
+ * is null before looking it up (null keys are skipped). This must be
+ * set to true if some of the keys that will be read from accessor may
+ * be null.
+ * @param functor A pointer to a functor which should provide an operator that
+ * takes 1 argument: const ValueAccessor&. The operator will be called
+ * only once for a key from accessor if there is no match.
+ */
+ template <typename FunctorT>
+ void runOverKeysFromValueAccessorIfMatchNotFoundCompositeKey(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ return runOverKeysFromValueAccessorCompositeKey<false>(accessor,
+ key_attr_ids,
+ check_for_null_keys,
+ functor);
+ }
+
+ /**
* @brief Apply a functor to each key, value pair in this hash table.
*
* @warning This method assumes that no concurrent calls to put(),
@@ -965,6 +1155,10 @@
const ValueT **value,
std::size_t *entry_num) const = 0;
+ // Return true if key exists in the hash table.
+ virtual bool hasKey(const TypedValue &key) const = 0;
+ virtual bool hasCompositeKey(const std::vector<TypedValue> &key) const = 0;
+
// For a resizable HashTable, grow to accomodate more entries. If
// 'extra_buckets' is not zero, it may serve as a "hint" to implementations
// that at least the requested number of extra buckets are required when
@@ -1048,6 +1242,21 @@
return false;
}
+ // If run_if_match_found is true, apply the functor to each key if a match is
+ // found; otherwise, apply the functor if no match is found.
+ template <bool run_if_match_found, typename FunctorT>
+ void runOverKeysFromValueAccessor(ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const bool check_for_null_keys,
+ FunctorT *functor) const;
+
+ template <bool run_if_match_found, typename FunctorT>
+ void runOverKeysFromValueAccessorCompositeKey(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys,
+ FunctorT *functor) const;
+
// Method containing the actual logic implementing getAllFromValueAccessor().
// Has extra template parameters that control behavior to avoid some
// inner-loop branching.
@@ -1678,6 +1887,184 @@
bool force_key_copy,
bool allow_duplicate_keys>
template <typename FunctorT>
+void HashTable<ValueT,
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ getAllFromValueAccessorWithExtraWorkForFirstMatch(
+ ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ while (accessor->next()) {
+ TypedValue key = accessor->getTypedValue(key_attr_id);
+ if (check_for_null_keys && key.isNull()) {
+ continue;
+ }
+ const std::size_t hash_code = adjust_hashes_ ? AdjustHash(key.getHash())
+ : key.getHash();
+ std::size_t entry_num = 0;
+ const ValueT *value;
+ if (getNextEntryForKey(key, hash_code, &value, &entry_num)) {
+ functor->recordMatch(*accessor);
+ (*functor)(*accessor, *value);
+ if (!allow_duplicate_keys) {
+ continue;
+ }
+ while (getNextEntryForKey(key, hash_code, &value, &entry_num)) {
+ (*functor)(*accessor, *value);
+ }
+ }
+ }
+ }); // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+template <typename FunctorT>
+void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ DEBUG_ASSERT(key_types_.size() == key_attr_ids.size());
+ std::vector<TypedValue> key_vector;
+ key_vector.resize(key_attr_ids.size());
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ while (accessor->next()) {
+ bool null_key = false;
+ for (std::vector<attribute_id>::size_type key_idx = 0;
+ key_idx < key_types_.size();
+ ++key_idx) {
+ key_vector[key_idx] = accessor->getTypedValue(key_attr_ids[key_idx]);
+ if (check_for_null_keys && key_vector[key_idx].isNull()) {
+ null_key = true;
+ break;
+ }
+ }
+ if (null_key) {
+ continue;
+ }
+
+ const std::size_t hash_code = adjust_hashes_ ? AdjustHash(hashCompositeKey(key_vector))
+ : hashCompositeKey(key_vector);
+ std::size_t entry_num = 0;
+ const ValueT *value;
+ if (getNextEntryForCompositeKey(key_vector, hash_code, &value, &entry_num)) {
+ functor->recordMatch(*accessor);
+ (*functor)(*accessor, *value);
+ if (!allow_duplicate_keys) {
+ continue;
+ }
+ while (getNextEntryForCompositeKey(key_vector, hash_code, &value, &entry_num)) {
+ (*functor)(*accessor, *value);
+ }
+ }
+ }
+ }); // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+template <bool run_if_match_found, typename FunctorT>
+void HashTable<ValueT,
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ runOverKeysFromValueAccessor(ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ while (accessor->next()) {
+ TypedValue key = accessor->getTypedValue(key_attr_id);
+ if (check_for_null_keys && key.isNull()) {
+ if (!run_if_match_found) {
+ (*functor)(*accessor);
+ continue;
+ }
+ }
+ if (run_if_match_found) {
+ if (hasKey(key)) {
+ (*functor)(*accessor);
+ }
+ } else {
+ if (!hasKey(key)) {
+ (*functor)(*accessor);
+ }
+ }
+ }
+ }); // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+template <bool run_if_match_found, typename FunctorT>
+void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::runOverKeysFromValueAccessorCompositeKey(ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ DEBUG_ASSERT(key_types_.size() == key_attr_ids.size());
+ std::vector<TypedValue> key_vector;
+ key_vector.resize(key_attr_ids.size());
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ while (accessor->next()) {
+ bool null_key = false;
+ for (std::vector<attribute_id>::size_type key_idx = 0;
+ key_idx < key_types_.size();
+ ++key_idx) {
+ key_vector[key_idx] = accessor->getTypedValue(key_attr_ids[key_idx]);
+ if (check_for_null_keys && key_vector[key_idx].isNull()) {
+ null_key = true;
+ break;
+ }
+ }
+ if (null_key) {
+ if (!run_if_match_found) {
+ (*functor)(*accessor);
+ continue;
+ }
+ }
+
+ if (run_if_match_found) {
+ if (hasCompositeKey(key_vector)) {
+ (*functor)(*accessor);
+ }
+ } else if (!hasCompositeKey(key_vector)) {
+ (*functor)(*accessor);
+ }
+ }
+ }); // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+template <typename FunctorT>
std::size_t HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
::forEach(FunctorT *functor) const {
std::size_t entries_visited = 0;
diff --git a/storage/LinearOpenAddressingHashTable.hpp b/storage/LinearOpenAddressingHashTable.hpp
index e5ca0b0..a2ebe18 100644
--- a/storage/LinearOpenAddressingHashTable.hpp
+++ b/storage/LinearOpenAddressingHashTable.hpp
@@ -151,6 +151,9 @@
const ValueT **value,
std::size_t *entry_num) const override;
+ bool hasKey(const TypedValue &key) const override;
+ bool hasCompositeKey(const std::vector<TypedValue> &key) const override;
+
void resize(const std::size_t extra_buckets,
const std::size_t extra_variable_storage,
const std::size_t retry_num = 0) override;
@@ -1099,6 +1102,75 @@
return false;
}
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool LinearOpenAddressingHashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::hasKey(const TypedValue &key) const {
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ const std::size_t hash_code = this->AdjustHash(key.getHash());
+ for (std::size_t bucket_num = hash_code % header_->num_buckets;
+ bucket_num < header_->num_buckets + header_->num_overflow_buckets;
+ ++bucket_num) {
+ const char *bucket = static_cast<const char*>(hash_buckets_) + bucket_num * bucket_size_;
+ const std::size_t bucket_hash
+ = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ if (bucket_hash == kEmptyHash) {
+ // Hit an empty bucket, so the search is finished
+ // without finding any match.
+ return false;
+ }
+
+ // None of the get methods should be called while inserts are still taking
+ // place.
+ DEBUG_ASSERT(bucket_hash != kPendingHash);
+
+ if ((bucket_hash == hash_code) && key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ return true;
+ }
+ }
+ return false;
+}
+
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool LinearOpenAddressingHashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::hasCompositeKey(const std::vector<TypedValue> &key) const {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->AdjustHash(this->hashCompositeKey(key));
+ for (std::size_t bucket_num = hash_code % header_->num_buckets;
+ bucket_num < header_->num_buckets + header_->num_overflow_buckets;
+ ++bucket_num) {
+ const char *bucket = static_cast<const char*>(hash_buckets_) + bucket_num * bucket_size_;
+ const std::size_t bucket_hash
+ = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ if (bucket_hash == kEmptyHash) {
+ // Hit an empty bucket, so the search is finished
+ // without finding any match.
+ return false;
+ }
+
+ // None of the get methods should be called while inserts are still taking
+ // place.
+ DEBUG_ASSERT(bucket_hash != kPendingHash);
+
+ if ((bucket_hash == hash_code) && key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Match located.
+ return true;
+ }
+ }
+ return false;
+}
+
// TODO(chasseur): Smarter heuristics that are more selective about whether
// to grow hash buckets, variable-length storage, or both, and to what degree.
template <typename ValueT,
diff --git a/storage/SeparateChainingHashTable.hpp b/storage/SeparateChainingHashTable.hpp
index c93e783..c096b1b 100644
--- a/storage/SeparateChainingHashTable.hpp
+++ b/storage/SeparateChainingHashTable.hpp
@@ -145,6 +145,9 @@
const ValueT **value,
std::size_t *entry_num) const override;
+ bool hasKey(const TypedValue &key) const override;
+ bool hasCompositeKey(const std::vector<TypedValue> &key) const override;
+
void resize(const std::size_t extra_buckets,
const std::size_t extra_variable_storage,
const std::size_t retry_num = 0) override;
@@ -1054,6 +1057,57 @@
bool serializable,
bool force_key_copy,
bool allow_duplicate_keys>
+bool SeparateChainingHashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::hasKey(const TypedValue &key) const {
+ DEBUG_ASSERT(this->key_types_.size() == 1);
+ DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ const std::size_t hash_code = key.getHash();
+ std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) && key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+ // Find a match.
+ return true;
+ }
+ bucket_ref = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ }
+ return false;
+}
+
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+bool SeparateChainingHashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::hasCompositeKey(const std::vector<TypedValue> &key) const {
+ DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+ const std::size_t hash_code = this->hashCompositeKey(key);
+ std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+ const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) * bucket_size_;
+ const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+ bucket + sizeof(std::atomic<std::size_t>));
+ if ((bucket_hash == hash_code) && key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+ // Find a match.
+ return true;
+ }
+ bucket_ref = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+ }
+ return false;
+}
+
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
void SeparateChainingHashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
::resize(const std::size_t extra_buckets,
const std::size_t extra_variable_storage,
diff --git a/storage/SimpleScalarSeparateChainingHashTable.hpp b/storage/SimpleScalarSeparateChainingHashTable.hpp
index b2d894d..1619e34 100644
--- a/storage/SimpleScalarSeparateChainingHashTable.hpp
+++ b/storage/SimpleScalarSeparateChainingHashTable.hpp
@@ -182,6 +182,12 @@
return getNextEntryForKey(key.front(), hash_code, value, entry_num);
}
+ bool hasKey(const TypedValue &key) const override;
+
+ bool hasCompositeKey(const std::vector<TypedValue> &key) const override {
+ return false;
+ }
+
void resize(const std::size_t extra_buckets,
const std::size_t extra_variable_storage,
const std::size_t retry_num = 0) override;
@@ -772,6 +778,37 @@
bool serializable,
bool force_key_copy,
bool allow_duplicate_keys>
+bool SimpleScalarSeparateChainingHashTable<ValueT,
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>
+ ::hasKey(const TypedValue &key) const {
+ DCHECK(!allow_duplicate_keys);
+ DCHECK(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+ const std::size_t hash_code = key.getHashScalarLiteral();
+ std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+ while (bucket_ref != 0) {
+ DCHECK_NE(bucket_ref, std::numeric_limits<std::size_t>::max());
+
+ const Bucket &bucket = buckets_[bucket_ref - 1];
+ if (bucket.hash == hash_code) {
+ // Match located.
+ return true;
+ }
+ bucket_ref = bucket.next.load(std::memory_order_relaxed);
+ }
+
+ // Reached the end of the chain and didn't find a match.
+ return false;
+}
+
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
void SimpleScalarSeparateChainingHashTable<ValueT,
resizable,
serializable,
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index da3bc70..97813e2 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -187,6 +187,17 @@
}
/**
+ * @brief Get the flag vector indicating for each IndexSubBlock
+ * whether it is consistent.
+ *
+ * @return The flag vector indicating for each IndexSubBlock
+ * whether it is consistent.
+ */
+ const std::vector<bool>& getIndicesConsistent() const {
+ return indices_consistent_;
+ }
+
+ /**
* @brief Get one of this block's IndexSubBlocks.
*
* @param index_id The ID of the IndexSubBlock. This is simply a serial
@@ -201,6 +212,15 @@
}
/**
+ * @brief Get the IndexSubBlock vector.
+ *
+ * @return The IndexSubBlock vector.
+ */
+ const PtrVector<IndexSubBlock>& getIndices() const {
+ return indices_;
+ }
+
+ /**
* @brief Insert a single tuple into this block.
*
* @param tuple The tuple to insert.