| /** |
| * Copyright 2011-2015 Quickstep Technologies LLC. |
| * Copyright 2015-2016 Pivotal Software, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| **/ |
| |
| #include "relational_operators/HashJoinOperator.hpp" |
| |
| #include <algorithm> |
| #include <memory> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include "catalog/CatalogRelation.hpp" |
| #include "catalog/CatalogRelationSchema.hpp" |
| #include "catalog/CatalogTypedefs.hpp" |
| #include "expressions/predicate/Predicate.hpp" |
| #include "expressions/scalar/Scalar.hpp" |
| #include "query_execution/QueryContext.hpp" |
| #include "query_execution/WorkOrdersContainer.hpp" |
| #include "storage/HashTable.hpp" |
| #include "storage/InsertDestination.hpp" |
| #include "storage/StorageBlock.hpp" |
| #include "storage/StorageBlockInfo.hpp" |
| #include "storage/StorageManager.hpp" |
| #include "storage/SubBlocksReference.hpp" |
| #include "storage/TupleReference.hpp" |
| #include "storage/TupleStorageSubBlock.hpp" |
| #include "storage/ValueAccessor.hpp" |
| #include "types/containers/ColumnVectorsValueAccessor.hpp" |
| |
| #include "gflags/gflags.h" |
| #include "glog/logging.h" |
| |
| #include "tmb/id_typedefs.h" |
| |
| using std::unique_ptr; |
| using std::vector; |
| |
| namespace quickstep { |
| |
| namespace { |
| |
| DEFINE_bool(vector_based_joined_tuple_collector, true, |
| "If true, use simple vector-based joined tuple collector in " |
| "hash join, with a final sort pass to group joined tuple pairs " |
| "by inner block. If false, use unordered_map based collector that " |
| "keeps joined pairs grouped by inner block as they are found " |
| "(this latter option has exhibited performance/scaling problems, " |
| "particularly in NUMA contexts)."); |
| |
| // Functor passed to HashTable::getAllFromValueAccessor() to collect matching |
| // tuples from the inner relation. This version stores matching tuple ID pairs |
| // in an unordered_map keyed by inner block ID. |
| // |
| // NOTE(chasseur): Performance testing has shown that this particular |
| // implementation has problems scaling in a multisocket NUMA machine. |
| // Additional benchmarking revealed problems using the STL unordered_map class |
| // in a NUMA system (at least for the implementation in GNU libstdc++), even |
| // though instances of this class and the internal unordered_map are private to |
| // a single thread. Because of this, VectorBasedJoinedTupleCollector is used by |
| // default instead. |
| class MapBasedJoinedTupleCollector { |
| public: |
| MapBasedJoinedTupleCollector() { |
| } |
| |
| template <typename ValueAccessorT> |
| inline void operator()(const ValueAccessorT &accessor, |
| const TupleReference &tref) { |
| joined_tuples_[tref.block].emplace_back(tref.tuple, accessor.getCurrentPosition()); |
| } |
| |
| // Consolidation is a no-op for this version, but we provide this trivial |
| // call so that MapBasedJoinedTupleCollector and |
| // VectorBasedJoinedTupleCollector have the same interface and can both be |
| // used in the templated HashInnerJoinWorkOrder::executeWithCollectorType() method. |
| inline void consolidate() const { |
| } |
| |
| // Get a mutable pointer to the collected map of joined tuple ID pairs. The |
| // key is inner block_id, values are vectors of joined tuple ID pairs with |
| // tuple ID from the inner block on the left and the outer block on the |
| // right. |
| inline std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>>* |
| getJoinedTuples() { |
| return &joined_tuples_; |
| } |
| |
| private: |
| // NOTE(chasseur): It would also be possible to represent joined tuples for a |
| // particular pair of blocks as a TupleIdSequence/BitVector over the |
| // cross-product of all tuples from both blocks, but simply using pairs of |
| // tuple-IDs is expected to be more space efficient if the result set is less |
| // than 1/64 the cardinality of the cross-product. |
| std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>> joined_tuples_; |
| }; |
| |
| // Compare std::pair instances based on their first element only. |
| template <typename PairT> |
| inline bool CompareFirst(const PairT &left, const PairT &right) { |
| return left.first < right.first; |
| } |
| |
| // Functor passed to HashTable::getAllFromValueAccessor() to collect matching |
| // tuples from the inner relation. This version stores inner block ID and pairs |
| // of joined tuple IDs in an unsorted vector, which should then be sorted with |
| // a call to consolidate() before materializing join output. |
| // |
| // NOTE(chasseur): Because of NUMA scaling issues for |
| // MapBasedJoinedTupleCollector noted above, this implementation is the |
| // default. |
| class VectorBasedJoinedTupleCollector { |
| public: |
| VectorBasedJoinedTupleCollector() { |
| } |
| |
| template <typename ValueAccessorT> |
| inline void operator()(const ValueAccessorT &accessor, |
| const TupleReference &tref) { |
| joined_tuples_.emplace_back(tref.block, |
| std::make_pair(tref.tuple, accessor.getCurrentPosition())); |
| } |
| |
| // Sorts joined tuple pairs by inner block ID. Must be called before |
| // getJoinedTuples(). |
| void consolidate() { |
| if (joined_tuples_.empty()) { |
| return; |
| } |
| |
| // Sort joined tuple_id pairs by inner block_id. |
| std::sort(joined_tuples_.begin(), |
| joined_tuples_.end(), |
| CompareFirst<std::pair<block_id, std::pair<tuple_id, tuple_id>>>); |
| |
| // Make a single vector of joined block_id pairs for each inner block for |
| // compatibility with other join-related APIs. |
| consolidated_joined_tuples_.emplace_back(joined_tuples_.front().first, |
| std::vector<std::pair<tuple_id, tuple_id>>()); |
| |
| for (const std::pair<block_id, std::pair<tuple_id, tuple_id>> &match_entry |
| : joined_tuples_) { |
| if (match_entry.first == consolidated_joined_tuples_.back().first) { |
| consolidated_joined_tuples_.back().second.emplace_back(match_entry.second); |
| } else { |
| consolidated_joined_tuples_.emplace_back( |
| match_entry.first, |
| std::vector<std::pair<tuple_id, tuple_id>>(1, match_entry.second)); |
| } |
| } |
| } |
| |
| // Get a mutable pointer to the collected joined tuple ID pairs. The returned |
| // vector has a single entry for each inner block where there are matching |
| // joined tuples (the inner block's ID is the first element of the pair). The |
| // second element of each pair is another vector consisting of pairs of |
| // joined tuple IDs (tuple ID from inner block on the left, from outer block |
| // on the right). |
| inline std::vector<std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>>* |
| getJoinedTuples() { |
| return &consolidated_joined_tuples_; |
| } |
| |
| private: |
| // Unsorted vector of join matches that is appended to by call operator(). |
| std::vector<std::pair<block_id, std::pair<tuple_id, tuple_id>>> joined_tuples_; |
| |
| // Joined tuples sorted by inner block_id. consolidate() populates this from |
| // 'joined_tuples_'. |
| std::vector<std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>> |
| consolidated_joined_tuples_; |
| }; |
| |
| class SemiAntiJoinTupleCollector { |
| public: |
| explicit SemiAntiJoinTupleCollector(const TupleStorageSubBlock &tuple_store) { |
| filter_.reset(tuple_store.getExistenceMap()); |
| } |
| |
| template <typename ValueAccessorT> |
| inline void operator()(const ValueAccessorT &accessor) { |
| filter_->set(accessor.getCurrentPosition(), false); |
| } |
| |
| const TupleIdSequence* filter() const { |
| return filter_.get(); |
| } |
| |
| private: |
| std::unique_ptr<TupleIdSequence> filter_; |
| }; |
| |
| } // namespace |
| |
| bool HashJoinOperator::getAllWorkOrders( |
| WorkOrdersContainer *container, |
| QueryContext *query_context, |
| StorageManager *storage_manager, |
| const tmb::client_id scheduler_client_id, |
| tmb::MessageBus *bus) { |
| switch (join_type_) { |
| case JoinType::kInnerJoin: |
| return getAllNonOuterJoinWorkOrders<HashInnerJoinWorkOrder>( |
| container, query_context, storage_manager); |
| case JoinType::kLeftSemiJoin: |
| return getAllNonOuterJoinWorkOrders<HashSemiJoinWorkOrder>( |
| container, query_context, storage_manager); |
| case JoinType::kLeftAntiJoin: |
| return getAllNonOuterJoinWorkOrders<HashAntiJoinWorkOrder>( |
| container, query_context, storage_manager); |
| default: |
| LOG(FATAL) << "Unknown join type in HashJoinOperator::getAllWorkOrders()"; |
| } |
| } |
| |
| template <class JoinWorkOrderClass> |
| bool HashJoinOperator::getAllNonOuterJoinWorkOrders( |
| WorkOrdersContainer *container, |
| QueryContext *query_context, |
| StorageManager *storage_manager) { |
| // We wait until the building of global hash table is complete. |
| if (blocking_dependencies_met_) { |
| DCHECK(query_context != nullptr); |
| |
| const Predicate *residual_predicate = |
| query_context->getPredicate(residual_predicate_index_); |
| const vector<unique_ptr<const Scalar>> &selection = |
| query_context->getScalarGroup(selection_index_); |
| InsertDestination *output_destination = |
| query_context->getInsertDestination(output_destination_index_); |
| const JoinHashTable &hash_table = |
| *(query_context->getJoinHashTable(hash_table_index_)); |
| |
| if (probe_relation_is_stored_) { |
| if (!started_) { |
| for (const block_id probe_block_id : probe_relation_block_ids_) { |
| container->addNormalWorkOrder( |
| new JoinWorkOrderClass(build_relation_, |
| probe_relation_, |
| join_key_attributes_, |
| any_join_key_attributes_nullable_, |
| probe_block_id, |
| residual_predicate, |
| selection, |
| hash_table, |
| output_destination, |
| storage_manager), |
| op_index_); |
| } |
| started_ = true; |
| } |
| return started_; |
| } else { |
| while (num_workorders_generated_ < probe_relation_block_ids_.size()) { |
| container->addNormalWorkOrder( |
| new JoinWorkOrderClass(build_relation_, |
| probe_relation_, |
| join_key_attributes_, |
| any_join_key_attributes_nullable_, |
| probe_relation_block_ids_[num_workorders_generated_], |
| residual_predicate, |
| selection, |
| hash_table, |
| output_destination, |
| storage_manager), |
| op_index_); |
| ++num_workorders_generated_; |
| } // end while |
| return done_feeding_input_relation_; |
| } // end else (probe_relation_is_stored_) |
| } // end if (blocking_dependencies_met_) |
| return false; |
| } |
| |
| void HashInnerJoinWorkOrder::execute() { |
| if (FLAGS_vector_based_joined_tuple_collector) { |
| executeWithCollectorType<VectorBasedJoinedTupleCollector>(); |
| } else { |
| executeWithCollectorType<MapBasedJoinedTupleCollector>(); |
| } |
| } |
| |
| template <typename CollectorT> |
| void HashInnerJoinWorkOrder::executeWithCollectorType() { |
| BlockReference probe_block( |
| storage_manager_->getBlock(block_id_, probe_relation_)); |
| const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); |
| |
| std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); |
| CollectorT collector; |
| if (join_key_attributes_.size() == 1) { |
| hash_table_.getAllFromValueAccessor( |
| probe_accessor.get(), |
| join_key_attributes_.front(), |
| any_join_key_attributes_nullable_, |
| &collector); |
| } else { |
| hash_table_.getAllFromValueAccessorCompositeKey( |
| probe_accessor.get(), |
| join_key_attributes_, |
| any_join_key_attributes_nullable_, |
| &collector); |
| } |
| collector.consolidate(); |
| |
| const relation_id build_relation_id = build_relation_.getID(); |
| const relation_id probe_relation_id = probe_relation_.getID(); |
| |
| for (std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>> |
| &build_block_entry : *collector.getJoinedTuples()) { |
| BlockReference build_block = |
| storage_manager_->getBlock(build_block_entry.first, build_relation_); |
| const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock(); |
| std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor()); |
| |
| // Evaluate '*residual_predicate_', if any. |
| // |
| // TODO(chasseur): We might consider implementing true vectorized |
| // evaluation for join predicates that are not equijoins (although in |
| // general that would require evaluating and materializing some expressions |
| // over the cross-product of all tuples in a pair of blocks in order to |
| // evaluate the predicate). We could use a heuristic where we only do the |
| // vectorized materialization and evaluation if the set of matches from the |
| // hash join is below a reasonable threshold so that we don't blow up |
| // temporary memory requirements to an unreasonable degree. |
| if (residual_predicate_ != nullptr) { |
| std::vector<std::pair<tuple_id, tuple_id>> filtered_matches; |
| |
| for (const std::pair<tuple_id, tuple_id> &hash_match |
| : build_block_entry.second) { |
| if (residual_predicate_->matchesForJoinedTuples(*build_accessor, |
| build_relation_id, |
| hash_match.first, |
| *probe_accessor, |
| probe_relation_id, |
| hash_match.second)) { |
| filtered_matches.emplace_back(hash_match); |
| } |
| } |
| |
| build_block_entry.second = std::move(filtered_matches); |
| } |
| |
| // TODO(chasseur): If all the output expressions are ScalarAttributes, |
| // we could implement a similar fast-path to StorageBlock::selectSimple() |
| // that avoids a copy. |
| // |
| // TODO(chasseur): See TODO in NestedLoopsJoinOperator.cpp about limiting |
| // the size of materialized temporary results. In common usage, this |
| // probably won't be an issue for hash-joins, but in the worst case a hash |
| // join can still devolve into a cross-product. |
| // |
| // NOTE(chasseur): We could also create one big ColumnVectorsValueAccessor |
| // and accumulate all the results across multiple block pairs into it |
| // before inserting anything into output blocks, but this would require |
| // some significant API extensions to the expressions system for a dubious |
| // benefit (probably only a real performance win when there are very few |
| // matching tuples in each individual inner block but very many inner |
| // blocks with at least one match). |
| ColumnVectorsValueAccessor temp_result; |
| for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin(); |
| selection_cit != selection_.end(); |
| ++selection_cit) { |
| temp_result.addColumn((*selection_cit)->getAllValuesForJoin(build_relation_id, |
| build_accessor.get(), |
| probe_relation_id, |
| probe_accessor.get(), |
| build_block_entry.second)); |
| } |
| |
| // NOTE(chasseur): calling the bulk-insert method of InsertDestination once |
| // for each pair of joined blocks incurs some extra overhead that could be |
| // avoided by keeping checked-out MutableBlockReferences across iterations |
| // of this loop, but that would get messy when combined with partitioning. |
| output_destination_->bulkInsertTuples(&temp_result); |
| } |
| } |
| |
| void HashSemiJoinWorkOrder::execute() { |
| if (residual_predicate_ == nullptr) { |
| executeWithoutResidualPredicate(); |
| } else { |
| executeWithResidualPredicate(); |
| } |
| } |
| |
| void HashSemiJoinWorkOrder::executeWithResidualPredicate() { |
| const relation_id build_relation_id = build_relation_.getID(); |
| const relation_id probe_relation_id = probe_relation_.getID(); |
| |
| BlockReference probe_block = storage_manager_->getBlock(block_id_, |
| probe_relation_); |
| const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); |
| |
| std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); |
| |
| // TODO(harshad) - Make this function work with both types of collectors. |
| |
| // We collect all the matching probe relation tuples, as there's a residual |
| // preidcate that needs to be applied after collecting these matches. |
| MapBasedJoinedTupleCollector collector; |
| if (join_key_attributes_.size() == 1) { |
| hash_table_.getAllFromValueAccessor( |
| probe_accessor.get(), |
| join_key_attributes_.front(), |
| any_join_key_attributes_nullable_, |
| &collector); |
| } else { |
| hash_table_.getAllFromValueAccessorCompositeKey( |
| probe_accessor.get(), |
| join_key_attributes_, |
| any_join_key_attributes_nullable_, |
| &collector); |
| } |
| |
| // Get a filter for tuples in the given probe block. |
| TupleIdSequence filter(probe_store.getMaxTupleID() + 1); |
| filter.setRange(0, filter.length(), false); |
| for (const std::pair<const block_id, |
| std::vector<std::pair<tuple_id, tuple_id>>> |
| &build_block_entry : *collector.getJoinedTuples()) { |
| // First element of the pair build_block_entry is the build block ID |
| // 2nd element of the pair is a vector of pairs, in each of which - |
| // 1st element is a matching tuple ID from the inner (build) relation. |
| // 2nd element is a matching tuple ID from the outer (probe) relation. |
| |
| // Get the block from the build relation for this pair of matched tuples. |
| BlockReference build_block = |
| storage_manager_->getBlock(build_block_entry.first, build_relation_); |
| const TupleStorageSubBlock &build_store = |
| build_block->getTupleStorageSubBlock(); |
| std::unique_ptr<ValueAccessor> build_accessor( |
| build_store.createValueAccessor()); |
| for (const std::pair<tuple_id, tuple_id> &hash_match |
| : build_block_entry.second) { |
| // For each pair, 1st element is a tuple ID from the build relation in the |
| // given build block, 2nd element is a tuple ID from the probe relation. |
| if (filter.get(hash_match.second)) { |
| // We have already found matches for this tuple that belongs to the |
| // probe side, skip it. |
| continue; |
| } |
| if (residual_predicate_->matchesForJoinedTuples(*build_accessor, |
| build_relation_id, |
| hash_match.first, |
| *probe_accessor, |
| probe_relation_id, |
| hash_match.second)) { |
| filter.set(hash_match.second, true); |
| } |
| } |
| } |
| |
| SubBlocksReference sub_blocks_ref(probe_store, |
| probe_block->getIndices(), |
| probe_block->getIndicesConsistent()); |
| |
| std::unique_ptr<ValueAccessor> probe_accessor_with_filter( |
| probe_store.createValueAccessor(&filter)); |
| ColumnVectorsValueAccessor temp_result; |
| for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin(); |
| selection_it != selection_.end(); |
| ++selection_it) { |
| temp_result.addColumn((*selection_it)->getAllValues( |
| probe_accessor_with_filter.get(), &sub_blocks_ref)); |
| } |
| |
| output_destination_->bulkInsertTuples(&temp_result); |
| } |
| |
| void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() { |
| DCHECK(residual_predicate_ == nullptr); |
| |
| BlockReference probe_block = storage_manager_->getBlock(block_id_, |
| probe_relation_); |
| const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); |
| |
| std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); |
| SemiAntiJoinTupleCollector collector(probe_store); |
| // We collect all the probe relation tuples which have at least one matching |
| // tuple in the build relation. As a performance optimization, the hash table |
| // just looks for the existence of the probing key in the hash table and sets |
| // the bit for the probing key in the collector. The optimization works |
| // because there is no residual predicate in this case, unlike |
| // executeWithResidualPredicate(). |
| if (join_key_attributes_.size() == 1u) { |
| // Call the collector to set the bit to 0 for every key without a match. |
| hash_table_.runOverKeysFromValueAccessorIfMatchNotFound( |
| probe_accessor.get(), |
| join_key_attributes_.front(), |
| any_join_key_attributes_nullable_, |
| &collector); |
| } else { |
| // Call the collector to set the bit to 0 for every key without a match. |
| hash_table_.runOverKeysFromValueAccessorIfMatchNotFoundCompositeKey( |
| probe_accessor.get(), |
| join_key_attributes_, |
| any_join_key_attributes_nullable_, |
| &collector); |
| } |
| |
| SubBlocksReference sub_blocks_ref(probe_store, |
| probe_block->getIndices(), |
| probe_block->getIndicesConsistent()); |
| |
| std::unique_ptr<ValueAccessor> probe_accessor_with_filter( |
| probe_store.createValueAccessor(collector.filter())); |
| ColumnVectorsValueAccessor temp_result; |
| for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin(); |
| selection_it != selection_.end(); ++selection_it) { |
| temp_result.addColumn((*selection_it)->getAllValues( |
| probe_accessor_with_filter.get(), &sub_blocks_ref)); |
| } |
| |
| output_destination_->bulkInsertTuples(&temp_result); |
| } |
| |
| void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() { |
| DCHECK(residual_predicate_ == nullptr); |
| |
| BlockReference probe_block = storage_manager_->getBlock(block_id_, |
| probe_relation_); |
| const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); |
| |
| std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); |
| SemiAntiJoinTupleCollector collector(probe_store); |
| // We probe the hash table to find the keys which have an entry in the |
| // hash table. |
| if (join_key_attributes_.size() == 1) { |
| // Call the collector to set the bit to 0 for every key with a match. |
| hash_table_.runOverKeysFromValueAccessorIfMatchFound( |
| probe_accessor.get(), |
| join_key_attributes_.front(), |
| any_join_key_attributes_nullable_, |
| &collector); |
| } else { |
| // Call the collector to set the bit to 0 for every key with a match. |
| hash_table_.runOverKeysFromValueAccessorIfMatchFoundCompositeKey( |
| probe_accessor.get(), |
| join_key_attributes_, |
| any_join_key_attributes_nullable_, |
| &collector); |
| } |
| |
| SubBlocksReference sub_blocks_ref(probe_store, |
| probe_block->getIndices(), |
| probe_block->getIndicesConsistent()); |
| |
| std::unique_ptr<ValueAccessor> probe_accessor_with_filter( |
| probe_store.createValueAccessor(collector.filter())); |
| ColumnVectorsValueAccessor temp_result; |
| for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin(); |
| selection_it != selection_.end(); ++selection_it) { |
| temp_result.addColumn((*selection_it)->getAllValues( |
| probe_accessor_with_filter.get(), &sub_blocks_ref)); |
| } |
| |
| output_destination_->bulkInsertTuples(&temp_result); |
| } |
| |
| void HashAntiJoinWorkOrder::executeWithResidualPredicate() { |
| const relation_id build_relation_id = build_relation_.getID(); |
| const relation_id probe_relation_id = probe_relation_.getID(); |
| |
| BlockReference probe_block = storage_manager_->getBlock(block_id_, |
| probe_relation_); |
| const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); |
| |
| std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); |
| // TODO(harshad) - Make the following code work with both types of collectors. |
| MapBasedJoinedTupleCollector collector; |
| // We probe the hash table and get all the matches. Unlike |
| // executeWithoutResidualPredicate(), we have to collect all the matching |
| // tuples, because after this step we still have to evalute the residual |
| // predicate. |
| if (join_key_attributes_.size() == 1) { |
| hash_table_.getAllFromValueAccessor( |
| probe_accessor.get(), |
| join_key_attributes_.front(), |
| any_join_key_attributes_nullable_, |
| &collector); |
| } else { |
| hash_table_.getAllFromValueAccessorCompositeKey( |
| probe_accessor.get(), |
| join_key_attributes_, |
| any_join_key_attributes_nullable_, |
| &collector); |
| } |
| |
| // Create a filter for all the tuples from the given probe block. |
| std::unique_ptr<TupleIdSequence> filter(probe_store.getExistenceMap()); |
| for (const std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>> |
| &build_block_entry : *collector.getJoinedTuples()) { |
| // First element of the pair build_block_entry is the build block ID |
| // 2nd element of the pair is a vector of pairs, in each of which - |
| // 1st element is a matching tuple ID from the inner (build) relation. |
| // 2nd element is a matching tuple ID from the outer (probe) relation. |
| BlockReference build_block = storage_manager_->getBlock(build_block_entry.first, |
| build_relation_); |
| const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock(); |
| std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor()); |
| for (const std::pair<tuple_id, tuple_id> &hash_match |
| : build_block_entry.second) { |
| if (!filter->get(hash_match.second)) { |
| // We have already seen this tuple, skip it. |
| continue; |
| } |
| if (residual_predicate_->matchesForJoinedTuples(*build_accessor, |
| build_relation_id, |
| hash_match.first, |
| *probe_accessor, |
| probe_relation_id, |
| hash_match.second)) { |
| // Note that the filter marks a match as false, as needed by the anti |
| // join definition. |
| filter->set(hash_match.second, false); |
| } |
| } |
| } |
| |
| SubBlocksReference sub_blocks_ref(probe_store, |
| probe_block->getIndices(), |
| probe_block->getIndicesConsistent()); |
| |
| std::unique_ptr<ValueAccessor> probe_accessor_with_filter( |
| probe_store.createValueAccessor(filter.get())); |
| ColumnVectorsValueAccessor temp_result; |
| for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin(); |
| selection_it != selection_.end(); |
| ++selection_it) { |
| temp_result.addColumn((*selection_it)->getAllValues(probe_accessor_with_filter.get(), |
| &sub_blocks_ref)); |
| } |
| |
| output_destination_->bulkInsertTuples(&temp_result); |
| } |
| |
| } // namespace quickstep |