| /** |
| * Copyright 2011-2015 Quickstep Technologies LLC. |
| * Copyright 2015-2016 Pivotal Software, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| **/ |
| |
| #include "relational_operators/HashJoinOperator.hpp" |
| |
| #include <algorithm> |
| #include <memory> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include "catalog/CatalogRelation.hpp" |
| #include "catalog/CatalogRelationSchema.hpp" |
| #include "catalog/CatalogTypedefs.hpp" |
| #include "expressions/predicate/Predicate.hpp" |
| #include "expressions/scalar/Scalar.hpp" |
| #include "query_execution/QueryContext.hpp" |
| #include "query_execution/WorkOrdersContainer.hpp" |
| #include "storage/HashTable.hpp" |
| #include "storage/InsertDestination.hpp" |
| #include "storage/StorageBlock.hpp" |
| #include "storage/StorageBlockInfo.hpp" |
| #include "storage/StorageManager.hpp" |
| #include "storage/TupleReference.hpp" |
| #include "storage/TupleStorageSubBlock.hpp" |
| #include "storage/ValueAccessor.hpp" |
| #include "types/containers/ColumnVectorsValueAccessor.hpp" |
| |
| #include "gflags/gflags.h" |
| #include "glog/logging.h" |
| |
| #include "tmb/id_typedefs.h" |
| |
| using std::unique_ptr; |
| using std::vector; |
| |
| namespace quickstep { |
| |
| namespace { |
| |
| DEFINE_bool(vector_based_joined_tuple_collector, true, |
| "If true, use simple vector-based joined tuple collector in " |
| "hash join, with a final sort pass to group joined tuple pairs " |
| "by inner block. If false, use unordered_map based collector that " |
| "keeps joined pairs grouped by inner block as they are found " |
| "(this latter option has exhibited performance/scaling problems, " |
| "particularly in NUMA contexts)."); |
| |
| // Functor passed to HashTable::getAllFromValueAccessor() to collect matching |
| // tuples from the inner relation. This version stores matching tuple ID pairs |
| // in an unordered_map keyed by inner block ID. |
| // |
| // NOTE(chasseur): Performance testing has shown that this particular |
| // implementation has problems scaling in a multisocket NUMA machine. |
| // Additional benchmarking revealed problems using the STL unordered_map class |
| // in a NUMA system (at least for the implementation in GNU libstdc++), even |
| // though instances of this class and the internal unordered_map are private to |
| // a single thread. Because of this, VectorBasedJoinedTupleCollector is used by |
| // default instead. |
| class MapBasedJoinedTupleCollector { |
| public: |
| MapBasedJoinedTupleCollector() { |
| } |
| |
| template <typename ValueAccessorT> |
| inline void operator()(const ValueAccessorT &accessor, |
| const TupleReference &tref) { |
| joined_tuples_[tref.block].emplace_back(tref.tuple, accessor.getCurrentPosition()); |
| } |
| |
| // Consolidation is a no-op for this version, but we provide this trivial |
| // call so that MapBasedJoinedTupleCollector and |
| // VectorBasedJoinedTupleCollector have the same interface and can both be |
| // used in the templated HashJoinWorkOrder::executeWithCollectorType() method. |
| inline void consolidate() const { |
| } |
| |
| // Get a mutable pointer to the collected map of joined tuple ID pairs. The |
| // key is inner block_id, values are vectors of joined tuple ID pairs with |
| // tuple ID from the inner block on the left and the outer block on the |
| // right. |
| inline std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>>* |
| getJoinedTuples() { |
| return &joined_tuples_; |
| } |
| |
| private: |
| // NOTE(chasseur): It would also be possible to represent joined tuples for a |
| // particular pair of blocks as a TupleIdSequence/BitVector over the |
| // cross-product of all tuples from both blocks, but simply using pairs of |
| // tuple-IDs is expected to be more space efficient if the result set is less |
| // than 1/64 the cardinality of the cross-product. |
| std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>> joined_tuples_; |
| }; |
| |
| // Compare std::pair instances based on their first element only. |
| template <typename PairT> |
| inline bool CompareFirst(const PairT &left, const PairT &right) { |
| return left.first < right.first; |
| } |
| |
| // Functor passed to HashTable::getAllFromValueAccessor() to collect matching |
| // tuples from the inner relation. This version stores inner block ID and pairs |
| // of joined tuple IDs in an unsorted vector, which should then be sorted with |
| // a call to consolidate() before materializing join output. |
| // |
| // NOTE(chasseur): Because of NUMA scaling issues for |
| // MapBasedJoinedTupleCollector noted above, this implementation is the |
| // default. |
| class VectorBasedJoinedTupleCollector { |
| public: |
| VectorBasedJoinedTupleCollector() { |
| } |
| |
| template <typename ValueAccessorT> |
| inline void operator()(const ValueAccessorT &accessor, |
| const TupleReference &tref) { |
| joined_tuples_.emplace_back(tref.block, |
| std::make_pair(tref.tuple, accessor.getCurrentPosition())); |
| } |
| |
| // Sorts joined tuple pairs by inner block ID. Must be called before |
| // getJoinedTuples(). |
| void consolidate() { |
| if (joined_tuples_.empty()) { |
| return; |
| } |
| |
| // Sort joined tuple_id pairs by inner block_id. |
| std::sort(joined_tuples_.begin(), |
| joined_tuples_.end(), |
| CompareFirst<std::pair<block_id, std::pair<tuple_id, tuple_id>>>); |
| |
| // Make a single vector of joined block_id pairs for each inner block for |
| // compatibility with other join-related APIs. |
| consolidated_joined_tuples_.emplace_back(joined_tuples_.front().first, |
| std::vector<std::pair<tuple_id, tuple_id>>()); |
| |
| for (const std::pair<block_id, std::pair<tuple_id, tuple_id>> &match_entry |
| : joined_tuples_) { |
| if (match_entry.first == consolidated_joined_tuples_.back().first) { |
| consolidated_joined_tuples_.back().second.emplace_back(match_entry.second); |
| } else { |
| consolidated_joined_tuples_.emplace_back( |
| match_entry.first, |
| std::vector<std::pair<tuple_id, tuple_id>>(1, match_entry.second)); |
| } |
| } |
| } |
| |
| // Get a mutable pointer to the collected joined tuple ID pairs. The returned |
| // vector has a single entry for each inner block where there are matching |
| // joined tuples (the inner block's ID is the first element of the pair). The |
| // second element of each pair is another vector consisting of pairs of |
| // joined tuple IDs (tuple ID from inner block on the left, from outer block |
| // on the right). |
| inline std::vector<std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>>* |
| getJoinedTuples() { |
| return &consolidated_joined_tuples_; |
| } |
| |
| private: |
| // Unsorted vector of join matches that is appended to by call operator(). |
| std::vector<std::pair<block_id, std::pair<tuple_id, tuple_id>>> joined_tuples_; |
| |
| // Joined tuples sorted by inner block_id. consolidate() populates this from |
| // 'joined_tuples_'. |
| std::vector<std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>> |
| consolidated_joined_tuples_; |
| }; |
| |
| } // namespace |
| |
| bool HashJoinOperator::getAllWorkOrders( |
| WorkOrdersContainer *container, |
| QueryContext *query_context, |
| StorageManager *storage_manager, |
| const tmb::client_id foreman_client_id, |
| const tmb::client_id agent_client_id, |
| tmb::MessageBus *bus) { |
| // We wait until the building of global hash table is complete. |
| if (blocking_dependencies_met_) { |
| DCHECK(query_context != nullptr); |
| |
| const Predicate *residual_predicate = query_context->getPredicate(residual_predicate_index_); |
| const vector<unique_ptr<const Scalar>> &selection = |
| query_context->getScalarGroup(selection_index_); |
| InsertDestination *output_destination = |
| query_context->getInsertDestination(output_destination_index_); |
| JoinHashTable *hash_table = query_context->getJoinHashTable(hash_table_index_); |
| |
| if (probe_relation_is_stored_) { |
| if (!started_) { |
| for (const block_id probe_block_id : probe_relation_block_ids_) { |
| container->addNormalWorkOrder( |
| new HashJoinWorkOrder(build_relation_, |
| probe_relation_, |
| join_key_attributes_, |
| any_join_key_attributes_nullable_, |
| probe_block_id, |
| residual_predicate, |
| selection, |
| output_destination, |
| hash_table, |
| storage_manager), |
| op_index_); |
| } |
| started_ = true; |
| } |
| return started_; |
| } else { |
| while (num_workorders_generated_ < probe_relation_block_ids_.size()) { |
| container->addNormalWorkOrder( |
| new HashJoinWorkOrder( |
| build_relation_, |
| probe_relation_, |
| join_key_attributes_, |
| any_join_key_attributes_nullable_, |
| probe_relation_block_ids_[num_workorders_generated_], |
| residual_predicate, |
| selection, |
| output_destination, |
| hash_table, |
| storage_manager), |
| op_index_); |
| ++num_workorders_generated_; |
| } // end while |
| return done_feeding_input_relation_; |
| } // end else (input_relation_is_stored is false) |
| } // end if (blocking_dependencies_met) |
| return false; |
| } |
| |
| void HashJoinWorkOrder::execute() { |
| if (FLAGS_vector_based_joined_tuple_collector) { |
| executeWithCollectorType<VectorBasedJoinedTupleCollector>(); |
| } else { |
| executeWithCollectorType<MapBasedJoinedTupleCollector>(); |
| } |
| } |
| |
| template <typename CollectorT> |
| void HashJoinWorkOrder::executeWithCollectorType() { |
| BlockReference probe_block( |
| storage_manager_->getBlock(block_id_, probe_relation_)); |
| const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); |
| |
| std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor()); |
| CollectorT collector; |
| if (join_key_attributes_.size() == 1) { |
| hash_table_->getAllFromValueAccessor( |
| probe_accessor.get(), |
| join_key_attributes_.front(), |
| any_join_key_attributes_nullable_, |
| &collector); |
| } else { |
| hash_table_->getAllFromValueAccessorCompositeKey( |
| probe_accessor.get(), |
| join_key_attributes_, |
| any_join_key_attributes_nullable_, |
| &collector); |
| } |
| collector.consolidate(); |
| |
| const relation_id build_relation_id = build_relation_.getID(); |
| const relation_id probe_relation_id = probe_relation_.getID(); |
| |
| for (std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>> |
| &build_block_entry : *collector.getJoinedTuples()) { |
| BlockReference build_block = |
| storage_manager_->getBlock(build_block_entry.first, build_relation_); |
| const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock(); |
| std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor()); |
| |
| // Evaluate '*residual_predicate_', if any. |
| // |
| // TODO(chasseur): We might consider implementing true vectorized |
| // evaluation for join predicates that are not equijoins (although in |
| // general that would require evaluating and materializing some expressions |
| // over the cross-product of all tuples in a pair of blocks in order to |
| // evaluate the predicate). We could use a heuristic where we only do the |
| // vectorized materialization and evaluation if the set of matches from the |
| // hash join is below a reasonable threshold so that we don't blow up |
| // temporary memory requirements to an unreasonable degree. |
| if (residual_predicate_ != nullptr) { |
| std::vector<std::pair<tuple_id, tuple_id>> filtered_matches; |
| |
| for (const std::pair<tuple_id, tuple_id> &hash_match |
| : build_block_entry.second) { |
| if (residual_predicate_->matchesForJoinedTuples(*build_accessor, |
| build_relation_id, |
| hash_match.first, |
| *probe_accessor, |
| probe_relation_id, |
| hash_match.second)) { |
| filtered_matches.emplace_back(hash_match); |
| } |
| } |
| |
| build_block_entry.second = std::move(filtered_matches); |
| } |
| |
| // TODO(chasseur): If all the output expressions are ScalarAttributes, |
| // we could implement a similar fast-path to StorageBlock::selectSimple() |
| // that avoids a copy. |
| // |
| // TODO(chasseur): See TODO in NestedLoopsJoinOperator.cpp about limiting |
| // the size of materialized temporary results. In common usage, this |
| // probably won't be an issue for hash-joins, but in the worst case a hash |
| // join can still devolve into a cross-product. |
| // |
| // NOTE(chasseur): We could also create one big ColumnVectorsValueAccessor |
| // and accumulate all the results across multiple block pairs into it |
| // before inserting anything into output blocks, but this would require |
| // some significant API extensions to the expressions system for a dubious |
| // benefit (probably only a real performance win when there are very few |
| // matching tuples in each individual inner block but very many inner |
| // blocks with at least one match). |
| ColumnVectorsValueAccessor temp_result; |
| for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin(); |
| selection_cit != selection_.end(); |
| ++selection_cit) { |
| temp_result.addColumn((*selection_cit)->getAllValuesForJoin(build_relation_id, |
| build_accessor.get(), |
| probe_relation_id, |
| probe_accessor.get(), |
| build_block_entry.second)); |
| } |
| |
| // NOTE(chasseur): calling the bulk-insert method of InsertDestination once |
| // for each pair of joined blocks incurs some extra overhead that could be |
| // avoided by keeping checked-out MutableBlockReferences across iterations |
| // of this loop, but that would get messy when combined with partitioning. |
| output_destination_->bulkInsertTuples(&temp_result); |
| } |
| } |
| |
| } // namespace quickstep |