| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| **/ |
| |
| #ifndef QUICKSTEP_RELATIONAL_OPERATORS_NESTED_LOOPS_JOIN_OPERATOR_HPP_ |
| #define QUICKSTEP_RELATIONAL_OPERATORS_NESTED_LOOPS_JOIN_OPERATOR_HPP_ |
| |
| #include <cstddef> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include "catalog/CatalogRelation.hpp" |
| #include "catalog/CatalogTypedefs.hpp" |
| #include "catalog/PartitionScheme.hpp" |
| #include "catalog/PartitionSchemeHeader.hpp" |
| #include "query_execution/QueryContext.hpp" |
| #include "relational_operators/RelationalOperator.hpp" |
| #include "relational_operators/WorkOrder.hpp" |
| #include "storage/StorageBlockInfo.hpp" |
| #include "utility/Macros.hpp" |
| |
| #include "glog/logging.h" |
| |
| #include "tmb/id_typedefs.h" |
| |
| namespace tmb { class MessageBus; } |
| |
| namespace quickstep { |
| |
| class CatalogRelationSchema; |
| class InsertDestination; |
| class Predicate; |
| class Scalar; |
| class StorageManager; |
| class TupleStorageSubBlock; |
| class WorkOrderProtosContainer; |
| class WorkOrdersContainer; |
| |
| namespace serialization { class WorkOrder; } |
| |
| /** \addtogroup RelationalOperators |
| * @{ |
| */ |
| |
| /** |
| * @brief An operator which performs a simple (SLOW) nested-loops join on two |
| * relations. |
| **/ |
| class NestedLoopsJoinOperator : public RelationalOperator { |
| public: |
| /** |
| * @brief Constructor. |
| * |
| * @param query_id The ID of the query to which this operator belongs. |
| * @param nested_loops_join_index The ID of this operator. |
| * @param left_input_relation The first relation in the join (order is not |
| * actually important). |
| * @param right_input_relation The second relation in the join (order is not |
| * actually important). |
| * @param num_partitions The number of partitions. |
| * @param has_repartition Whether this operator does repartition. |
| * @param output_relation The output relation. |
| * @param output_destination_index The index of the InsertDestination in the |
| * QueryContext to insert the join results. |
| * @param join_predicate_index The index of join predicate in QueryContext to |
| * evaluate for each pair of tuples in the input relations. |
| * (cannot be kInvalidPredicateId). |
| * @param selection_index The group index of Scalars in QueryContext, |
| * corresponding to the attributes of the relation referred by |
| * output_relation_id. Each Scalar is evaluated for the joined tuples, |
| * and the resulting value is inserted into the join result. |
| * @param left_relation_is_stored If left_input_relation is a stored relation. |
| * @param right_relation_is_stored If right_input_relation is a stored |
| * relation. |
| **/ |
| NestedLoopsJoinOperator( |
| const std::size_t query_id, |
| const std::size_t nested_loops_join_index, |
| const CatalogRelation &left_input_relation, |
| const CatalogRelation &right_input_relation, |
| const std::size_t num_partitions, |
| const bool has_repartition, |
| const CatalogRelation &output_relation, |
| const QueryContext::insert_destination_id output_destination_index, |
| const QueryContext::predicate_id join_predicate_index, |
| const QueryContext::scalar_group_id selection_index, |
| const bool left_relation_is_stored, |
| const bool right_relation_is_stored) |
| : RelationalOperator(query_id, num_partitions, has_repartition, output_relation.getNumPartitions()), |
| nested_loops_join_index_(nested_loops_join_index), |
| left_input_relation_(left_input_relation), |
| right_input_relation_(right_input_relation), |
| output_relation_(output_relation), |
| output_destination_index_(output_destination_index), |
| join_predicate_index_(join_predicate_index), |
| selection_index_(selection_index), |
| left_relation_is_stored_(left_relation_is_stored), |
| right_relation_is_stored_(right_relation_is_stored), |
| left_relation_block_ids_(num_partitions), |
| right_relation_block_ids_(num_partitions), |
| num_left_workorders_generated_(num_partitions), |
| num_right_workorders_generated_(num_partitions), |
| done_feeding_left_relation_(false), |
| done_feeding_right_relation_(false), |
| all_workorders_generated_(false) { |
| DCHECK_NE(join_predicate_index_, QueryContext::kInvalidPredicateId); |
| |
| if (left_relation_is_stored) { |
| if (left_input_relation_.hasPartitionScheme()) { |
| const PartitionScheme &part_scheme = *left_input_relation_.getPartitionScheme(); |
| DCHECK_EQ(num_partitions_, part_scheme.getPartitionSchemeHeader().getNumPartitions()); |
| for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { |
| left_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id); |
| } |
| } else { |
| DCHECK_EQ(1u, num_partitions_); |
| left_relation_block_ids_[0] = left_input_relation_.getBlocksSnapshot(); |
| } |
| } |
| |
| if (right_relation_is_stored) { |
| if (right_input_relation_.hasPartitionScheme()) { |
| const PartitionScheme &part_scheme = *right_input_relation_.getPartitionScheme(); |
| DCHECK_EQ(num_partitions_, part_scheme.getPartitionSchemeHeader().getNumPartitions()); |
| for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { |
| right_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id); |
| } |
| } else { |
| // Broadcast right (smaller) side upon partitioned nlj. |
| for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) { |
| right_relation_block_ids_[part_id] = right_input_relation_.getBlocksSnapshot(); |
| } |
| } |
| } |
| } |
| |
| ~NestedLoopsJoinOperator() override {} |
| |
| OperatorType getOperatorType() const override { |
| return kNestedLoopsJoin; |
| } |
| |
| std::string getName() const override { |
| return "NestedLoopsJoinOperator"; |
| } |
| |
| bool getAllWorkOrders(WorkOrdersContainer *container, |
| QueryContext *query_context, |
| StorageManager *storage_manager, |
| const tmb::client_id scheduler_client_id, |
| tmb::MessageBus *bus) override; |
| |
| bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; |
| |
| void doneFeedingInputBlocks(const relation_id rel_id) override { |
| if (rel_id == left_input_relation_.getID()) { |
| done_feeding_left_relation_ = true; |
| } else if (rel_id == right_input_relation_.getID()) { |
| done_feeding_right_relation_ = true; |
| } else { |
| FATAL_ERROR("Wrong relation ID in doneFeedingInputBlocks method."); |
| } |
| } |
| |
| void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, |
| const partition_id part_id) override { |
| if (input_relation_id == left_input_relation_.getID()) { |
| left_relation_block_ids_[part_id].push_back(input_block_id); |
| } else if (input_relation_id == right_input_relation_.getID()) { |
| // Broadcast right (smaller) side upon partitioned nlj. |
| for (partition_id input_part_id = 0; input_part_id < num_partitions_; ++input_part_id) { |
| right_relation_block_ids_[input_part_id].push_back(input_block_id); |
| } |
| } else { |
| LOG(FATAL) << "The input block sent to the NestedLoopsJoinOperator belongs " |
| << "to a different relation than the left and right relations"; |
| } |
| } |
| |
| QueryContext::insert_destination_id getInsertDestinationID() const override { |
| return output_destination_index_; |
| } |
| |
| const relation_id getOutputRelationID() const override { |
| return output_relation_.getID(); |
| } |
| |
| const CatalogRelation& left_input_relation() const { |
| return left_input_relation_; |
| } |
| |
| const CatalogRelation& right_input_relation() const { |
| return right_input_relation_; |
| } |
| |
| private: |
| /** |
| * @brief Pairs block IDs from left and right relation block IDs and generates |
| * NestedLoopsJoinWorkOrders and pushes them to the WorkOrdersContainer |
| * when both relations are not stored relations. |
| * |
| * @param container A pointer to the WorkOrdersContainer to store the |
| * resulting WorkOrders. |
| * @param query_context The QueryContext that stores query execution states. |
| * @param storage_manager The StorageManager to use. |
| * @param part_id The partition ID. |
| * @param left_min The starting index in left_relation_block_ids_ from where |
| * we begin generating NestedLoopsJoinWorkOrders. |
| * @param left_max The index in left_relation_block_ids_ until which we |
| * generate NestedLoopsJoinWorkOrders (excluding left_max). |
| * @param right_min The starting index in right_relation_block_ids_ from where |
| * we begin generating NestedLoopsJoinWorkOrders. |
| * @param right_max The index in right_relation_block_ids_ until which we |
| * generate NestedLoopsJoinWorkOrders. (excluding right_max). |
| * |
| * @return The number of workorders generated during the execution of this |
| * function. |
| **/ |
| std::size_t getAllWorkOrdersHelperBothNotStored(WorkOrdersContainer *container, |
| QueryContext *query_context, |
| StorageManager *storage_manager, |
| const partition_id part_id, |
| std::vector<block_id>::size_type left_min, |
| std::vector<block_id>::size_type left_max, |
| std::vector<block_id>::size_type right_min, |
| std::vector<block_id>::size_type right_max); |
| |
| /** |
| * @brief Pairs block IDs from left and right relation block IDs and generates |
| * NestedLoopsJoinWorkOrders and pushes them to the WorkOrdersContainer |
| * when only one relation is a stored relation. |
| * |
| * @param container A pointer to the WorkOrdersContainer to store the |
| * resulting WorkOrders. |
| * @param query_context The QueryContext that stores query execution states. |
| * @param storage_manager The StorageManager to use. |
| * |
| * @return Whether all work orders have been generated. |
| **/ |
| bool getAllWorkOrdersHelperOneStored(WorkOrdersContainer *container, |
| QueryContext *query_context, |
| StorageManager *storage_manager); |
| |
| /** |
| * @brief Pairs block IDs from left and right relation block IDs and generates |
| * NestedLoopsJoinWorkOrder protos and pushes them to the |
| * WorkOrderProtosContainer when both relations are not stored |
| * relations. |
| * |
| * @param container A pointer to the WorkOrderProtosContainer to store the |
| * resulting WorkOrder protos. |
| * @param left_min The starting index in left_relation_block_ids_ from where |
| * we begin generating NestedLoopsJoinWorkOrders. |
| * @param left_max The index in left_relation_block_ids_ until which we |
| * generate NestedLoopsJoinWorkOrders (excluding left_max). |
| * @param right_min The starting index in right_relation_block_ids_ from where |
| * we begin generating NestedLoopsJoinWorkOrders. |
| * @param right_max The index in right_relation_block_ids_ until which we |
| * generate NestedLoopsJoinWorkOrders. (excluding right_max). |
| * |
| * @return The number of workorder protos generated during the execution of this |
| * function. |
| **/ |
| std::size_t getAllWorkOrderProtosHelperBothNotStored(WorkOrderProtosContainer *container, |
| const partition_id part_id, |
| const std::vector<block_id>::size_type left_min, |
| const std::vector<block_id>::size_type left_max, |
| const std::vector<block_id>::size_type right_min, |
| const std::vector<block_id>::size_type right_max); |
| |
| /** |
| * @brief Pairs block IDs from left and right relation block IDs and generates |
| * NestedLoopsJoinWorkOrder protos and pushes them to the |
| * WorkOrderProtosContainer when only one relation is a stored relation. |
| * |
| * @param container A pointer to the WorkOrderProtosContainer to store the |
| * resulting WorkOrder protos. |
| * |
| * @return Whether all work orders have been generated. |
| **/ |
| bool getAllWorkOrderProtosHelperOneStored(WorkOrderProtosContainer *container); |
| |
| serialization::WorkOrder* createWorkOrderProto(const partition_id part_id, |
| const block_id left_block, |
| const block_id right_block); |
| |
| const std::size_t nested_loops_join_index_; |
| |
| const CatalogRelation &left_input_relation_; |
| const CatalogRelation &right_input_relation_; |
| |
| const CatalogRelation &output_relation_; |
| const QueryContext::insert_destination_id output_destination_index_; |
| |
| const QueryContext::predicate_id join_predicate_index_; |
| const QueryContext::scalar_group_id selection_index_; |
| |
| const bool left_relation_is_stored_; |
| const bool right_relation_is_stored_; |
| |
| std::vector<BlocksInPartition> left_relation_block_ids_; |
| std::vector<BlocksInPartition> right_relation_block_ids_; |
| |
| // At a given point of time, we have paired num_left_workorders_generated[part_id] |
| // number of blocks from the left relation with num_right_workorders_generated[part_id] |
| // number of blocks from the right relation for a given 'part_id'. |
| std::vector<std::size_t> num_left_workorders_generated_; |
| std::vector<std::size_t> num_right_workorders_generated_; |
| |
| bool done_feeding_left_relation_; |
| bool done_feeding_right_relation_; |
| |
| // Applicable only when both the relations are stored relations. |
| bool all_workorders_generated_; |
| |
| DISALLOW_COPY_AND_ASSIGN(NestedLoopsJoinOperator); |
| }; |
| |
| /** |
| * @brief A WorkOrder produced by NestedLoopsJoinOperator |
| **/ |
| class NestedLoopsJoinWorkOrder : public WorkOrder { |
| public: |
| /** |
| * @brief Constructor. |
| * |
| * @param query_id The ID of the query to which this operator belongs. |
| * @param left_input_relation The first relation in the join (order is not |
| * actually important). |
| * @param right_input_relation The second relation in the join (order is not |
| * actually important). |
| * @param part_id The partition id. |
| * @param left_block_id The block id of the first relation. |
| * @param right_block_id The block id of the second relation. |
| * @param join_predicate The join predicate to evaluate for each pair of |
| * tuples in the input relations. (cannot be NULL). |
| * @param selection A list of Scalars corresponding to the relation attributes |
| * in \c output_destination. Each Scalar is evaluated for the joined |
| * tuples, and the resulting value is inserted into the join result. |
| * @param output_destination The InsertDestination to insert the join results. |
| * @param storage_manager The StorageManager to use. |
| **/ |
| NestedLoopsJoinWorkOrder( |
| const std::size_t query_id, |
| const CatalogRelationSchema &left_input_relation, |
| const CatalogRelationSchema &right_input_relation, |
| const partition_id part_id, |
| const block_id left_block_id, |
| const block_id right_block_id, |
| const Predicate *join_predicate, |
| const std::vector<std::unique_ptr<const Scalar>> &selection, |
| InsertDestination *output_destination, |
| StorageManager *storage_manager) |
| : WorkOrder(query_id, part_id), |
| left_input_relation_(left_input_relation), |
| right_input_relation_(right_input_relation), |
| left_block_id_(left_block_id), |
| right_block_id_(right_block_id), |
| join_predicate_(DCHECK_NOTNULL(join_predicate)), |
| selection_(selection), |
| output_destination_(DCHECK_NOTNULL(output_destination)), |
| storage_manager_(DCHECK_NOTNULL(storage_manager)) {} |
| |
| ~NestedLoopsJoinWorkOrder() override {} |
| |
| /** |
| * @exception TupleTooLargeForBlock A tuple produced by this join was too |
| * large to insert into an empty block provided by |
| * output_destination_. Join may be partially complete (with |
| * some tuples inserted into the destination) when this exception |
| * is thrown, causing potential inconsistency. |
| **/ |
| void execute() override; |
| |
| private: |
| template <bool LEFT_PACKED, bool RIGHT_PACKED> |
| void executeHelper(const TupleStorageSubBlock &left_store, |
| const TupleStorageSubBlock &right_store); |
| |
| const CatalogRelationSchema &left_input_relation_, &right_input_relation_; |
| const block_id left_block_id_, right_block_id_; |
| const Predicate *join_predicate_; |
| const std::vector<std::unique_ptr<const Scalar>> &selection_; |
| |
| InsertDestination *output_destination_; |
| StorageManager *storage_manager_; |
| |
| DISALLOW_COPY_AND_ASSIGN(NestedLoopsJoinWorkOrder); |
| }; |
| |
| /** @} */ |
| |
| } // namespace quickstep |
| |
| #endif // QUICKSTEP_RELATIONAL_OPERATORS_NESTED_LOOPS_JOIN_OPERATOR_HPP_ |