Added num_partitions in RelationalOperator.
diff --git a/catalog/CMakeLists.txt b/catalog/CMakeLists.txt
index 0ca014e..1aa2b41 100644
--- a/catalog/CMakeLists.txt
+++ b/catalog/CMakeLists.txt
@@ -108,6 +108,7 @@
quickstep_catalog_Catalog_proto
quickstep_catalog_IndexScheme
quickstep_catalog_PartitionScheme
+ quickstep_catalog_PartitionSchemeHeader
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageBlockLayout
quickstep_storage_StorageBlockLayout_proto
diff --git a/catalog/CatalogRelation.cpp b/catalog/CatalogRelation.cpp
index 793fc2d..38c86f6 100644
--- a/catalog/CatalogRelation.cpp
+++ b/catalog/CatalogRelation.cpp
@@ -33,6 +33,7 @@
#endif
#include "catalog/PartitionScheme.hpp"
+#include "catalog/PartitionSchemeHeader.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageBlockLayout.hpp"
#include "storage/StorageBlockLayout.pb.h"
@@ -180,6 +181,10 @@
void CatalogRelation::setPartitionScheme(PartitionScheme* partition_scheme) {
DCHECK_EQ(0u, size_blocks());
partition_scheme_.reset(partition_scheme);
+
+ if (partition_scheme_) {
+ num_partitions_ = partition_scheme_->getPartitionSchemeHeader().getNumPartitions();
+ }
}
void CatalogRelation::setDefaultStorageBlockLayout(StorageBlockLayout *default_layout) {
diff --git a/catalog/CatalogRelation.hpp b/catalog/CatalogRelation.hpp
index af7f273..e40b599 100644
--- a/catalog/CatalogRelation.hpp
+++ b/catalog/CatalogRelation.hpp
@@ -81,6 +81,7 @@
bool temporary = false)
: CatalogRelationSchema(parent, name, id, temporary),
default_layout_(nullptr),
+ num_partitions_(1u),
statistics_(new CatalogRelationStatistics()) {
}
@@ -138,6 +139,15 @@
}
/**
+ * @brief Get the number of partitions for the relation.
+ *
+ * @return The number of partitions the relation is partitioned into.
+ **/
+ std::size_t getNumPartitions() const {
+ return num_partitions_;
+ }
+
+ /**
* @brief Check if a NUMA placement scheme is available for the relation.
*
* @return True if the relation has a NUMA placement scheme, false otherwise.
@@ -423,6 +433,7 @@
// A relation may or may not have a Partition Scheme
// assosiated with it.
std::unique_ptr<PartitionScheme> partition_scheme_;
+ std::size_t num_partitions_;
// Index scheme associated with this relation.
// Defines a set of indices defined for this relation.
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 9bfd136..d43432c 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -747,13 +747,6 @@
insert_destination_proto);
// Create and add a Select operator.
- const PartitionScheme *input_partition_scheme = input_relation.getPartitionScheme();
-
- const std::size_t num_partitions =
- input_partition_scheme
- ? input_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
- : 1u;
-
// Use the "simple" form of the selection operator (a pure projection that
// doesn't require any expression evaluation or intermediate copies) if
// possible.
@@ -766,16 +759,14 @@
insert_destination_index,
execution_predicate_index,
move(attributes),
- input_relation_info->isStoredRelation(),
- num_partitions)
+ input_relation_info->isStoredRelation())
: new SelectOperator(query_handle_->query_id(),
input_relation,
*output_relation,
insert_destination_index,
execution_predicate_index,
project_expressions_group_index,
- input_relation_info->isStoredRelation(),
- num_partitions);
+ input_relation_info->isStoredRelation());
const QueryPlan::DAGNodeIndex select_index =
execution_plan_->addRelationalOperator(op);
@@ -847,12 +838,6 @@
findRelationInfoOutputByPhysical(build_physical);
const CatalogRelation &build_relation = *build_relation_info->relation;
- const PartitionScheme *build_partition_scheme = build_relation.getPartitionScheme();
-
- const std::size_t build_num_partitions =
- build_partition_scheme
- ? build_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
- : 1u;
// Create a BuildLIPFilterOperator for the FilterJoin. This operator builds
// LIP filters that are applied properly in downstream operators to achieve
@@ -862,7 +847,6 @@
new BuildLIPFilterOperator(
query_handle_->query_id(),
build_relation,
- build_num_partitions,
build_side_predicate_index,
build_relation_info->isStoredRelation()));
@@ -954,7 +938,7 @@
const CatalogRelationInfo *build_relation_info =
findRelationInfoOutputByPhysical(build_physical);
- const CatalogRelationInfo *probe_operator_info =
+ const CatalogRelationInfo *probe_relation_info =
findRelationInfoOutputByPhysical(probe_physical);
// Create a vector that indicates whether each project expression is using
@@ -970,9 +954,10 @@
}
const CatalogRelation *build_relation = build_relation_info->relation;
+ const CatalogRelation *probe_relation = probe_relation_info->relation;
// FIXME(quickstep-team): Add support for self-join.
- if (build_relation == probe_operator_info->relation) {
+ if (build_relation == probe_relation) {
THROW_SQL_ERROR() << "Self-join is not supported";
}
@@ -982,9 +967,7 @@
S::QueryContext::HashTableContext *hash_table_context_proto =
query_context_proto_->add_join_hash_tables();
- const P::PartitionSchemeHeader *probe_partition_scheme_header = probe_physical->getOutputPartitionSchemeHeader();
- const std::size_t probe_num_partitions =
- probe_partition_scheme_header ? probe_partition_scheme_header->num_partitions : 1u;
+ const std::size_t probe_num_partitions = probe_relation->getNumPartitions();
hash_table_context_proto->set_num_partitions(probe_num_partitions);
S::HashTable *hash_table_proto = hash_table_context_proto->mutable_join_hash_table();
@@ -1052,8 +1035,8 @@
new HashJoinOperator(
query_handle_->query_id(),
*build_relation,
- *probe_operator_info->relation,
- probe_operator_info->isStoredRelation(),
+ *probe_relation,
+ probe_relation_info->isStoredRelation(),
probe_attribute_ids,
any_probe_attributes_nullable,
probe_num_partitions,
@@ -1083,9 +1066,9 @@
build_relation_info->producer_operator_index,
true /* is_pipeline_breaker */);
}
- if (!probe_operator_info->isStoredRelation()) {
+ if (!probe_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(join_operator_index,
- probe_operator_info->producer_operator_index,
+ probe_relation_info->producer_operator_index,
false /* is_pipeline_breaker */);
}
execution_plan_->addDirectDependency(join_operator_index,
@@ -1140,15 +1123,11 @@
THROW_SQL_ERROR() << "NestedLoopsJoin does not support self-join yet";
}
- const PartitionScheme *left_partition_scheme = left_relation.getPartitionScheme();
- const std::size_t num_partitions =
- left_partition_scheme ? left_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
- : 1u;
+ const std::size_t num_partitions = left_relation.getNumPartitions();
#ifdef QUICKSTEP_DEBUG
- const PartitionScheme *right_partition_scheme = right_relation.getPartitionScheme();
- if (right_partition_scheme) {
- DCHECK_EQ(num_partitions, right_partition_scheme->getPartitionSchemeHeader().getNumPartitions());
+ if (right_relation.hasPartitionScheme()) {
+ DCHECK_EQ(num_partitions, right_relation.getNumPartitions());
}
#endif
@@ -1549,12 +1528,6 @@
const CatalogRelationInfo *selection_relation_info =
findRelationInfoOutputByPhysical(physical_plan->selection());
const CatalogRelation &selection_relation = *selection_relation_info->relation;
- const PartitionScheme *selection_partition_scheme = selection_relation.getPartitionScheme();
-
- const std::size_t num_partitions =
- selection_partition_scheme
- ? selection_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
- : 1u;
// Prepare the attributes, which are output columns of the selection relation.
std::vector<attribute_id> attributes;
@@ -1580,8 +1553,7 @@
insert_destination_index,
QueryContext::kInvalidPredicateId,
move(attributes),
- selection_relation_info->isStoredRelation(),
- num_partitions);
+ selection_relation_info->isStoredRelation());
const QueryPlan::DAGNodeIndex insert_selection_index =
execution_plan_->addRelationalOperator(insert_selection_op);
@@ -1756,11 +1728,7 @@
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
const CatalogRelation &input_relation = *input_relation_info->relation;
- const PartitionScheme *input_partition_scheme = input_relation.getPartitionScheme();
- const size_t num_partitions =
- input_partition_scheme
- ? input_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
- : 1u;
+ const size_t num_partitions = input_relation.getNumPartitions();
// Create aggr state proto.
const QueryContext::aggregation_state_id aggr_state_index =
@@ -2312,7 +2280,9 @@
// Get input.
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
- window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
+ const CatalogRelation &input_relation = *input_relation_info->relation;
+ DCHECK_EQ(1u, input_relation.getNumPartitions());
+ window_aggr_state_proto->set_input_relation_id(input_relation.getID());
// Get window aggregate function expression.
const E::AliasPtr &named_window_aggregate_expression =
@@ -2382,7 +2352,7 @@
const QueryPlan::DAGNodeIndex window_aggregation_operator_index =
execution_plan_->addRelationalOperator(
new WindowAggregationOperator(query_handle_->query_id(),
- *input_relation_info->relation,
+ input_relation,
*output_relation,
window_aggr_state_index,
insert_destination_index));
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index 2fb620b..c673ff5 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -76,11 +76,10 @@
bool input_relation_is_stored,
const QueryContext::aggregation_state_id aggr_state_index,
const std::size_t num_partitions)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, num_partitions),
input_relation_(input_relation),
input_relation_is_stored_(input_relation_is_stored),
aggr_state_index_(aggr_state_index),
- num_partitions_(num_partitions),
input_relation_block_ids_(num_partitions),
num_workorders_generated_(num_partitions),
started_(false) {
@@ -136,7 +135,6 @@
const CatalogRelation &input_relation_;
const bool input_relation_is_stored_;
const QueryContext::aggregation_state_id aggr_state_index_;
- const std::size_t num_partitions_;
// The index is the partition id.
std::vector<BlocksInPartition> input_relation_block_ids_;
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp
index 8791a38..9cfd93d 100644
--- a/relational_operators/BuildAggregationExistenceMapOperator.hpp
+++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp
@@ -79,12 +79,11 @@
const bool input_relation_is_stored,
const QueryContext::aggregation_state_id aggr_state_index,
const std::size_t num_partitions)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, num_partitions),
input_relation_(input_relation),
build_attribute_(build_attribute),
input_relation_is_stored_(input_relation_is_stored),
aggr_state_index_(aggr_state_index),
- num_partitions_(num_partitions),
input_relation_block_ids_(num_partitions),
num_workorders_generated_(num_partitions),
started_(false) {
@@ -138,7 +137,6 @@
const attribute_id build_attribute_;
const bool input_relation_is_stored_;
const QueryContext::aggregation_state_id aggr_state_index_;
- const std::size_t num_partitions_;
// The index is the partition id.
std::vector<BlocksInPartition> input_relation_block_ids_;
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index d18d9fb..dfb6dfe 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -89,12 +89,11 @@
const bool any_join_key_attributes_nullable,
const std::size_t num_partitions,
const QueryContext::join_hash_table_id hash_table_index)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, num_partitions),
input_relation_(input_relation),
input_relation_is_stored_(input_relation_is_stored),
join_key_attributes_(join_key_attributes),
any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
- num_partitions_(num_partitions),
is_broadcast_join_(num_partitions > 1u && !input_relation.hasPartitionScheme()),
hash_table_index_(hash_table_index),
input_relation_block_ids_(num_partitions),
@@ -102,8 +101,9 @@
started_(false) {
if (input_relation_is_stored) {
if (input_relation.hasPartitionScheme()) {
+ DCHECK_EQ(num_partitions_, input_relation.getNumPartitions());
+
const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
- DCHECK_EQ(part_scheme.getPartitionSchemeHeader().getNumPartitions(), num_partitions_);
for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
}
@@ -162,7 +162,6 @@
const bool input_relation_is_stored_;
const std::vector<attribute_id> join_key_attributes_;
const bool any_join_key_attributes_nullable_;
- const std::size_t num_partitions_;
const bool is_broadcast_join_;
const QueryContext::join_hash_table_id hash_table_index_;
diff --git a/relational_operators/BuildLIPFilterOperator.hpp b/relational_operators/BuildLIPFilterOperator.hpp
index 9b23dd9..41c3294 100644
--- a/relational_operators/BuildLIPFilterOperator.hpp
+++ b/relational_operators/BuildLIPFilterOperator.hpp
@@ -28,7 +28,6 @@
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "catalog/PartitionScheme.hpp"
-#include "catalog/PartitionSchemeHeader.hpp"
#include "query_execution/QueryContext.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -70,8 +69,6 @@
*
* @param query_id The ID of the query to which this operator belongs.
* @param input_relation The relation to build LIP filters on.
- * @param num_partitions The number of partitions in 'input_relation'.
- * If no partitions, it is one.
* @param build_side_predicate_index The index of the predicate in QueryContext
* where the predicate is to be applied to the input relation before
* building the LIP filters (or kInvalidPredicateId if no predicate is
@@ -82,21 +79,18 @@
**/
BuildLIPFilterOperator(const std::size_t query_id,
const CatalogRelation &input_relation,
- const std::size_t num_partitions,
const QueryContext::predicate_id build_side_predicate_index,
const bool input_relation_is_stored)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, input_relation.getNumPartitions()),
input_relation_(input_relation),
- num_partitions_(num_partitions),
build_side_predicate_index_(build_side_predicate_index),
input_relation_is_stored_(input_relation_is_stored),
- input_relation_block_ids_(num_partitions),
- num_workorders_generated_(num_partitions),
+ input_relation_block_ids_(num_partitions_),
+ num_workorders_generated_(num_partitions_),
started_(false) {
if (input_relation_is_stored) {
if (input_relation.hasPartitionScheme()) {
const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
- DCHECK_EQ(part_scheme.getPartitionSchemeHeader().getNumPartitions(), num_partitions_);
for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
}
@@ -148,7 +142,6 @@
serialization::WorkOrder* createWorkOrderProto(const partition_id part_id, const block_id block);
const CatalogRelation &input_relation_;
- const std::size_t num_partitions_;
const QueryContext::predicate_id build_side_predicate_index_;
const bool input_relation_is_stored_;
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index c0def6f..57ba9f9 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -153,7 +153,6 @@
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
quickstep_catalog_PartitionScheme
- quickstep_catalog_PartitionSchemeHeader
quickstep_queryexecution_QueryContext
quickstep_queryexecution_WorkOrderProtosContainer
quickstep_queryexecution_WorkOrdersContainer
@@ -190,6 +189,7 @@
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogRelationSchema
quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_PartitionScheme
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
@@ -394,7 +394,7 @@
glog
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
- quickstep_catalog_PartitionSchemeHeader
+ quickstep_catalog_PartitionScheme
quickstep_queryexecution_QueryContext
quickstep_queryexecution_WorkOrderProtosContainer
quickstep_queryexecution_WorkOrdersContainer
@@ -534,6 +534,7 @@
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogRelationSchema
quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_PartitionScheme
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
diff --git a/relational_operators/CreateIndexOperator.hpp b/relational_operators/CreateIndexOperator.hpp
index 09d5d81..9449f53 100644
--- a/relational_operators/CreateIndexOperator.hpp
+++ b/relational_operators/CreateIndexOperator.hpp
@@ -64,7 +64,7 @@
CatalogRelation *relation,
const std::string &index_name,
IndexSubBlockDescription &&index_description) // NOLINT(whitespace/operators)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, 0u),
relation_(DCHECK_NOTNULL(relation)),
index_name_(index_name),
index_description_(index_description) {}
diff --git a/relational_operators/CreateTableOperator.hpp b/relational_operators/CreateTableOperator.hpp
index 52c7846..827ed72 100644
--- a/relational_operators/CreateTableOperator.hpp
+++ b/relational_operators/CreateTableOperator.hpp
@@ -64,7 +64,7 @@
CreateTableOperator(const std::size_t query_id,
CatalogRelation *relation,
CatalogDatabase *database)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, 0u),
relation_(DCHECK_NOTNULL(relation)),
database_(DCHECK_NOTNULL(database)) {}
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index a8723a9..4958024 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -25,6 +25,7 @@
#include <vector>
#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionUtil.hpp"
@@ -58,36 +59,42 @@
return true;
}
- for (const block_id input_block_id : relation_block_ids_) {
- container->addNormalWorkOrder(
- new DeleteWorkOrder(query_id_,
- relation_,
- input_block_id,
- predicate,
- storage_manager,
- op_index_,
- scheduler_client_id,
- bus),
- op_index_);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (const block_id input_block_id : relation_block_ids_[part_id]) {
+ container->addNormalWorkOrder(
+ new DeleteWorkOrder(query_id_,
+ relation_,
+ part_id,
+ input_block_id,
+ predicate,
+ storage_manager,
+ op_index_,
+ scheduler_client_id,
+ bus),
+ op_index_);
+ }
}
started_ = true;
return true;
- } else {
- while (num_workorders_generated_ < relation_block_ids_.size()) {
+ }
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ while (num_workorders_generated_[part_id] < relation_block_ids_[part_id].size()) {
container->addNormalWorkOrder(
new DeleteWorkOrder(query_id_,
relation_,
- relation_block_ids_[num_workorders_generated_],
+ part_id,
+ relation_block_ids_[part_id][num_workorders_generated_[part_id]],
predicate,
storage_manager,
op_index_,
scheduler_client_id,
bus),
op_index_);
- ++num_workorders_generated_;
+ ++num_workorders_generated_[part_id];
}
- return done_feeding_input_relation_;
}
+ return done_feeding_input_relation_;
}
bool DeleteOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
@@ -97,23 +104,27 @@
return true;
}
- for (const block_id input_block_id : relation_block_ids_) {
- container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (const block_id input_block_id : relation_block_ids_[part_id]) {
+ container->addWorkOrderProto(createWorkOrderProto(part_id, input_block_id), op_index_);
+ }
}
started_ = true;
return true;
- } else {
- while (num_workorders_generated_ < relation_block_ids_.size()) {
- container->addWorkOrderProto(
- createWorkOrderProto(relation_block_ids_[num_workorders_generated_]),
- op_index_);
- ++num_workorders_generated_;
- }
- return done_feeding_input_relation_;
}
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ while (num_workorders_generated_[part_id] < relation_block_ids_[part_id].size()) {
+ container->addWorkOrderProto(
+ createWorkOrderProto(part_id, relation_block_ids_[part_id][num_workorders_generated_[part_id]]),
+ op_index_);
+ ++num_workorders_generated_[part_id];
+ }
+ }
+ return done_feeding_input_relation_;
}
-serialization::WorkOrder* DeleteOperator::createWorkOrderProto(const block_id block) {
+serialization::WorkOrder* DeleteOperator::createWorkOrderProto(const partition_id part_id, const block_id block) {
serialization::WorkOrder *proto = new serialization::WorkOrder;
proto->set_work_order_type(serialization::DELETE);
proto->set_query_id(query_id_);
@@ -122,6 +133,7 @@
proto->SetExtension(serialization::DeleteWorkOrder::relation_id, relation_.getID());
proto->SetExtension(serialization::DeleteWorkOrder::predicate_index, predicate_index_);
proto->SetExtension(serialization::DeleteWorkOrder::block_id, block);
+ proto->SetExtension(serialization::DeleteWorkOrder::partition_id, part_id);
return proto;
}
@@ -139,6 +151,7 @@
proto.set_block_id(input_block_id_);
proto.set_relation_id(input_relation_.getID());
proto.set_query_id(query_id_);
+ proto.set_partition_id(partition_id_);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp
index aed37f6..b084560 100644
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@ -26,6 +26,7 @@
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "relational_operators/RelationalOperator.hpp"
@@ -73,14 +74,26 @@
const CatalogRelation &relation,
const QueryContext::predicate_id predicate_index,
const bool relation_is_stored)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, relation.getNumPartitions()),
relation_(relation),
predicate_index_(predicate_index),
relation_is_stored_(relation_is_stored),
started_(false),
- relation_block_ids_(relation_is_stored ? relation.getBlocksSnapshot()
- : std::vector<block_id>()),
- num_workorders_generated_(0) {}
+ relation_block_ids_(num_partitions_),
+ num_workorders_generated_(num_partitions_) {
+ if (relation_is_stored) {
+ if (relation.hasPartitionScheme()) {
+ const PartitionScheme &part_scheme = *relation.getPartitionScheme();
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
+ }
+ } else {
+ DCHECK_EQ(1u, num_partitions_);
+ relation_block_ids_[0] = relation.getBlocksSnapshot();
+ }
+ }
+ }
~DeleteOperator() override {}
@@ -107,16 +120,17 @@
void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
const partition_id part_id) override {
DCHECK(!relation_is_stored_);
- relation_block_ids_.push_back(input_block_id);
+ relation_block_ids_[part_id].push_back(input_block_id);
}
private:
/**
* @brief Create Work Order proto.
*
+ * @param part_id The partition id.
* @param block The block id used in the Work Order.
**/
- serialization::WorkOrder* createWorkOrderProto(const block_id block);
+ serialization::WorkOrder* createWorkOrderProto(const partition_id part_id, const block_id block);
const CatalogRelation &relation_;
const QueryContext::predicate_id predicate_index_;
@@ -125,8 +139,8 @@
bool started_;
- std::vector<block_id> relation_block_ids_;
- std::vector<block_id>::size_type num_workorders_generated_;
+ std::vector<std::vector<block_id>> relation_block_ids_;
+ std::vector<std::size_t> num_workorders_generated_;
DISALLOW_COPY_AND_ASSIGN(DeleteOperator);
};
@@ -141,6 +155,7 @@
*
* @param query_id The ID of the query to which this workorder belongs.
* @param input_relation The relation to perform the DELETE over.
+ * @param part_id The partition id.
* @param input_block_id The block Id.
* @param predicate All tuples matching \c predicate will be deleted (If
* NULL, then all tuples will be deleted).
@@ -152,13 +167,14 @@
**/
DeleteWorkOrder(const std::size_t query_id,
const CatalogRelationSchema &input_relation,
+ const partition_id part_id,
const block_id input_block_id,
const Predicate *predicate,
StorageManager *storage_manager,
const std::size_t delete_operator_index,
const tmb::client_id scheduler_client_id,
MessageBus *bus)
- : WorkOrder(query_id),
+ : WorkOrder(query_id, part_id),
input_relation_(input_relation),
input_block_id_(input_block_id),
predicate_(predicate),
diff --git a/relational_operators/DestroyAggregationStateOperator.hpp b/relational_operators/DestroyAggregationStateOperator.hpp
index c5b092f..f285957 100644
--- a/relational_operators/DestroyAggregationStateOperator.hpp
+++ b/relational_operators/DestroyAggregationStateOperator.hpp
@@ -61,9 +61,8 @@
const std::size_t query_id,
const QueryContext::aggregation_state_id aggr_state_index,
const std::size_t num_partitions)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, num_partitions),
aggr_state_index_(aggr_state_index),
- num_partitions_(num_partitions),
work_generated_(false) {}
~DestroyAggregationStateOperator() override {}
@@ -86,7 +85,6 @@
private:
const QueryContext::aggregation_state_id aggr_state_index_;
- const std::size_t num_partitions_;
bool work_generated_;
DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateOperator);
diff --git a/relational_operators/DestroyHashOperator.cpp b/relational_operators/DestroyHashOperator.cpp
index 4b93f0c..490c316 100644
--- a/relational_operators/DestroyHashOperator.cpp
+++ b/relational_operators/DestroyHashOperator.cpp
@@ -38,7 +38,7 @@
return true;
}
- for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+ for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
container->addNormalWorkOrder(
new DestroyHashWorkOrder(query_id_, hash_table_index_, part_id, query_context),
op_index_);
@@ -52,7 +52,7 @@
return true;
}
- for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+ for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
serialization::WorkOrder *proto = new serialization::WorkOrder;
proto->set_work_order_type(serialization::DESTROY_HASH);
proto->set_query_id(query_id_);
diff --git a/relational_operators/DestroyHashOperator.hpp b/relational_operators/DestroyHashOperator.hpp
index 3a6cf0e..6c4c997 100644
--- a/relational_operators/DestroyHashOperator.hpp
+++ b/relational_operators/DestroyHashOperator.hpp
@@ -53,14 +53,13 @@
* @brief Constructor.
*
* @param query_id The ID of the query to which this operator belongs.
- * @param build_num_partitions The number of partitions in 'build_relation'.
+ * @param num_partitions The number of partitions.
* @param hash_table_index The index of the JoinHashTable in QueryContext.
**/
DestroyHashOperator(const std::size_t query_id,
- const std::size_t build_num_partitions,
+ const std::size_t num_partitions,
const QueryContext::join_hash_table_id hash_table_index)
- : RelationalOperator(query_id),
- build_num_partitions_(build_num_partitions),
+ : RelationalOperator(query_id, num_partitions),
hash_table_index_(hash_table_index),
work_generated_(false) {}
@@ -83,7 +82,6 @@
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
private:
- const std::size_t build_num_partitions_;
const QueryContext::join_hash_table_id hash_table_index_;
bool work_generated_;
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index a6f1ef6..382a7da 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -72,9 +72,8 @@
const std::size_t aggr_state_num_partitions,
const CatalogRelation &output_relation,
const QueryContext::insert_destination_id output_destination_index)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, num_partitions),
aggr_state_index_(aggr_state_index),
- num_partitions_(num_partitions),
aggr_state_num_partitions_(aggr_state_num_partitions),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
@@ -108,7 +107,7 @@
private:
const QueryContext::aggregation_state_id aggr_state_index_;
- const std::size_t num_partitions_, aggr_state_num_partitions_;
+ const std::size_t aggr_state_num_partitions_;
const CatalogRelation &output_relation_;
const QueryContext::insert_destination_id output_destination_index_;
bool started_;
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 316be66..519718c 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -137,13 +137,12 @@
const QueryContext::scalar_group_id selection_index,
const std::vector<bool> *is_selection_on_build = nullptr,
const JoinType join_type = JoinType::kInnerJoin)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, num_partitions),
build_relation_(build_relation),
probe_relation_(probe_relation),
probe_relation_is_stored_(probe_relation_is_stored),
join_key_attributes_(join_key_attributes),
any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
- num_partitions_(num_partitions),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
hash_table_index_(hash_table_index),
@@ -276,7 +275,6 @@
const bool probe_relation_is_stored_;
const std::vector<attribute_id> join_key_attributes_;
const bool any_join_key_attributes_nullable_;
- const std::size_t num_partitions_;
const CatalogRelation &output_relation_;
const QueryContext::insert_destination_id output_destination_index_;
const QueryContext::join_hash_table_id hash_table_index_;
diff --git a/relational_operators/InitializeAggregationOperator.hpp b/relational_operators/InitializeAggregationOperator.hpp
index 0b052c7..cf9abe5 100644
--- a/relational_operators/InitializeAggregationOperator.hpp
+++ b/relational_operators/InitializeAggregationOperator.hpp
@@ -65,9 +65,8 @@
const QueryContext::aggregation_state_id aggr_state_index,
const std::size_t num_partitions,
const std::size_t aggr_state_num_init_partitions)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, num_partitions),
aggr_state_index_(aggr_state_index),
- num_partitions_(num_partitions),
aggr_state_num_init_partitions_(aggr_state_num_init_partitions),
started_(false) {}
@@ -91,7 +90,7 @@
private:
const QueryContext::aggregation_state_id aggr_state_index_;
- const std::size_t num_partitions_, aggr_state_num_init_partitions_;
+ const std::size_t aggr_state_num_init_partitions_;
bool started_;
DISALLOW_COPY_AND_ASSIGN(InitializeAggregationOperator);
diff --git a/relational_operators/NestedLoopsJoinOperator.hpp b/relational_operators/NestedLoopsJoinOperator.hpp
index 006d496..1b14638 100644
--- a/relational_operators/NestedLoopsJoinOperator.hpp
+++ b/relational_operators/NestedLoopsJoinOperator.hpp
@@ -100,11 +100,10 @@
const QueryContext::scalar_group_id selection_index,
const bool left_relation_is_stored,
const bool right_relation_is_stored)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, num_partitions),
nested_loops_join_index_(nested_loops_join_index),
left_input_relation_(left_input_relation),
right_input_relation_(right_input_relation),
- num_partitions_(num_partitions),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
join_predicate_index_(join_predicate_index),
@@ -296,8 +295,6 @@
const CatalogRelation &left_input_relation_;
const CatalogRelation &right_input_relation_;
- const std::size_t num_partitions_;
-
const CatalogRelation &output_relation_;
const QueryContext::insert_destination_id output_destination_index_;
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index 5de7eb5..29ae194 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -260,6 +260,15 @@
}
/**
+ * @brief Get the number of partitions of the input relation in the operator.
+ *
+ * @return The number of partitions of the input relation.
+ */
+ std::size_t getNumPartitions() const {
+ return num_partitions_;
+ }
+
+ /**
* @brief Deploy a group of LIPFilters to this operator.
*/
void deployLIPFilters(const QueryContext::lip_deployment_id lip_deployment_index,
@@ -273,15 +282,18 @@
* @brief Constructor
*
* @param query_id The ID of the query to which this operator belongs.
- * @param blocking_dependencies_met If those dependencies which break the
- * pipeline have been met.
+ * @param num_partitions The number of partitions of the input relation.
+ * If create table / index, return zero. If no partitions, return one.
**/
- explicit RelationalOperator(const std::size_t query_id)
+ explicit RelationalOperator(const std::size_t query_id,
+ const std::size_t num_partitions = 1u)
: query_id_(query_id),
+ num_partitions_(num_partitions),
done_feeding_input_relation_(false),
lip_deployment_index_(QueryContext::kInvalidLIPDeploymentId) {}
const std::size_t query_id_;
+ const std::size_t num_partitions_;
bool done_feeding_input_relation_;
std::size_t op_index_;
diff --git a/relational_operators/SaveBlocksOperator.cpp b/relational_operators/SaveBlocksOperator.cpp
index 9d6c3f6..591de12 100644
--- a/relational_operators/SaveBlocksOperator.cpp
+++ b/relational_operators/SaveBlocksOperator.cpp
@@ -22,6 +22,7 @@
#include <vector>
#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/WorkOrder.pb.h"
@@ -38,31 +39,37 @@
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
- while (num_workorders_generated_ < destination_block_ids_.size()) {
- container->addNormalWorkOrder(
- new SaveBlocksWorkOrder(
- query_id_,
- destination_block_ids_[num_workorders_generated_],
- force_,
- storage_manager),
- op_index_);
- ++num_workorders_generated_;
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ while (num_workorders_generated_[part_id] < destination_block_ids_[part_id].size()) {
+ container->addNormalWorkOrder(
+ new SaveBlocksWorkOrder(
+ query_id_,
+ part_id,
+ destination_block_ids_[part_id][num_workorders_generated_[part_id]],
+ force_,
+ storage_manager),
+ op_index_);
+ ++num_workorders_generated_[part_id];
+ }
}
return done_feeding_input_relation_;
}
bool SaveBlocksOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- while (num_workorders_generated_ < destination_block_ids_.size()) {
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::SAVE_BLOCKS);
- proto->set_query_id(query_id_);
- proto->SetExtension(serialization::SaveBlocksWorkOrder::block_id,
- destination_block_ids_[num_workorders_generated_]);
- proto->SetExtension(serialization::SaveBlocksWorkOrder::force, force_);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ while (num_workorders_generated_[part_id] < destination_block_ids_[part_id].size()) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::SAVE_BLOCKS);
+ proto->set_query_id(query_id_);
+ proto->SetExtension(serialization::SaveBlocksWorkOrder::block_id,
+ destination_block_ids_[part_id][num_workorders_generated_[part_id]]);
+ proto->SetExtension(serialization::SaveBlocksWorkOrder::force, force_);
+ proto->SetExtension(serialization::SaveBlocksWorkOrder::partition_id, part_id);
- container->addWorkOrderProto(proto, op_index_);
+ container->addWorkOrderProto(proto, op_index_);
- ++num_workorders_generated_;
+ ++num_workorders_generated_[part_id];
+ }
}
return done_feeding_input_relation_;
}
diff --git a/relational_operators/SaveBlocksOperator.hpp b/relational_operators/SaveBlocksOperator.hpp
index 4489299..cea2d7e 100644
--- a/relational_operators/SaveBlocksOperator.hpp
+++ b/relational_operators/SaveBlocksOperator.hpp
@@ -24,6 +24,7 @@
#include <string>
#include <vector>
+#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -38,7 +39,6 @@
namespace quickstep {
-class CatalogRelation;
class QueryContext;
class StorageManager;
class WorkOrderProtosContainer;
@@ -64,10 +64,11 @@
SaveBlocksOperator(const std::size_t query_id,
CatalogRelation *relation,
const bool force = false)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, relation->getNumPartitions()),
force_(force),
relation_(relation),
- num_workorders_generated_(0) {}
+ destination_block_ids_(num_partitions_),
+ num_workorders_generated_(num_partitions_) {}
~SaveBlocksOperator() override {}
@@ -89,7 +90,7 @@
void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
const partition_id part_id) override {
- destination_block_ids_.push_back(input_block_id);
+ destination_block_ids_[part_id].push_back(input_block_id);
}
void updateCatalogOnCompletion() override;
@@ -98,9 +99,9 @@
const bool force_;
CatalogRelation *relation_;
- std::vector<block_id> destination_block_ids_;
+ std::vector<std::vector<block_id>> destination_block_ids_;
- std::vector<block_id>::size_type num_workorders_generated_;
+ std::vector<std::size_t> num_workorders_generated_;
DISALLOW_COPY_AND_ASSIGN(SaveBlocksOperator);
};
@@ -114,16 +115,18 @@
* @brief Constructor.
*
* @param query_id The ID of the query to which this operator belongs.
+ * @param part_id The partition id.
* @param save_block_id The id of the block to save.
* @param force If true, force writing of all blocks to disk, otherwise only
* write dirty blocks.
* @param storage_manager The StorageManager to use.
**/
SaveBlocksWorkOrder(const std::size_t query_id,
+ const partition_id part_id,
const block_id save_block_id,
const bool force,
StorageManager *storage_manager)
- : WorkOrder(query_id),
+ : WorkOrder(query_id, part_id),
save_block_id_(save_block_id),
force_(force),
storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index f4937b3..a71f4c1 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -32,7 +32,7 @@
#include "catalog/NUMAPlacementScheme.hpp"
#endif
-#include "catalog/PartitionSchemeHeader.hpp"
+#include "catalog/PartitionScheme.hpp"
#include "query_execution/QueryContext.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -85,8 +85,6 @@
* @param input_relation_is_stored If input_relation is a stored relation and
* is fully available to the operator before it can start generating
* workorders.
- * @param num_partitions The number of partitions in 'input_relation'.
- * If no partitions, it is one.
**/
SelectOperator(
const std::size_t query_id,
@@ -95,17 +93,15 @@
const QueryContext::insert_destination_id output_destination_index,
const QueryContext::predicate_id predicate_index,
const QueryContext::scalar_group_id selection_index,
- const bool input_relation_is_stored,
- const std::size_t num_partitions)
- : RelationalOperator(query_id),
+ const bool input_relation_is_stored)
+ : RelationalOperator(query_id, input_relation.getNumPartitions()),
input_relation_(input_relation),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
predicate_index_(predicate_index),
selection_index_(selection_index),
- num_partitions_(num_partitions),
- input_relation_block_ids_(num_partitions),
- num_workorders_generated_(num_partitions),
+ input_relation_block_ids_(num_partitions_),
+ num_workorders_generated_(num_partitions_),
simple_projection_(false),
input_relation_is_stored_(input_relation_is_stored),
started_(false) {
@@ -120,6 +116,7 @@
input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
}
} else {
+ DCHECK_EQ(1u, num_partitions_);
input_relation_block_ids_[0] = input_relation.getBlocksSnapshot();
}
}
@@ -143,8 +140,6 @@
* @param input_relation_is_stored If input_relation is a stored relation and
* is fully available to the operator before it can start generating
* workorders.
- * @param num_partitions The number of partitions in 'input_relation'.
- * If no partitions, it is one.
**/
SelectOperator(
const std::size_t query_id,
@@ -153,18 +148,16 @@
const QueryContext::insert_destination_id output_destination_index,
const QueryContext::predicate_id predicate_index,
std::vector<attribute_id> &&selection,
- const bool input_relation_is_stored,
- const std::size_t num_partitions)
- : RelationalOperator(query_id),
+ const bool input_relation_is_stored)
+ : RelationalOperator(query_id, input_relation.getNumPartitions()),
input_relation_(input_relation),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
predicate_index_(predicate_index),
selection_index_(QueryContext::kInvalidScalarGroupId),
simple_selection_(std::move(selection)),
- num_partitions_(num_partitions),
- input_relation_block_ids_(num_partitions),
- num_workorders_generated_(num_partitions),
+ input_relation_block_ids_(num_partitions_),
+ num_workorders_generated_(num_partitions_),
simple_projection_(true),
input_relation_is_stored_(input_relation_is_stored),
started_(false) {
@@ -179,6 +172,7 @@
input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
}
} else {
+ DCHECK_EQ(1u, num_partitions_);
input_relation_block_ids_[0] = input_relation.getBlocksSnapshot();
}
}
@@ -239,7 +233,6 @@
const QueryContext::scalar_group_id selection_index_;
const std::vector<attribute_id> simple_selection_;
- const std::size_t num_partitions_;
// A vector of vectors V where V[i] indicates the list of block IDs of the
// input relation that belong to the partition i.
std::vector<std::vector<block_id>> input_relation_block_ids_;
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 08bfa59..0f7f999 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -25,6 +25,7 @@
#include <utility>
#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionUtil.hpp"
@@ -57,21 +58,24 @@
}
DCHECK(query_context != nullptr);
- for (const block_id input_block_id : input_blocks_) {
- container->addNormalWorkOrder(
- new UpdateWorkOrder(
- query_id_,
- relation_,
- input_block_id,
- query_context->getPredicate(predicate_index_),
- query_context->getUpdateGroup(update_group_index_),
- query_context->getInsertDestination(
- relocation_destination_index_),
- storage_manager,
- op_index_,
- scheduler_client_id,
- bus),
- op_index_);
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (const block_id input_block_id : input_blocks_[part_id]) {
+ container->addNormalWorkOrder(
+ new UpdateWorkOrder(
+ query_id_,
+ relation_,
+ part_id,
+ input_block_id,
+ query_context->getPredicate(predicate_index_),
+ query_context->getUpdateGroup(update_group_index_),
+ query_context->getInsertDestination(
+ relocation_destination_index_),
+ storage_manager,
+ op_index_,
+ scheduler_client_id,
+ bus),
+ op_index_);
+ }
}
started_ = true;
return true;
@@ -82,19 +86,22 @@
return true;
}
- for (const block_id input_block_id : input_blocks_) {
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::UPDATE);
- proto->set_query_id(query_id_);
+ for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+ for (const block_id input_block_id : input_blocks_[part_id]) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::UPDATE);
+ proto->set_query_id(query_id_);
- proto->SetExtension(serialization::UpdateWorkOrder::operator_index, op_index_);
- proto->SetExtension(serialization::UpdateWorkOrder::relation_id, relation_.getID());
- proto->SetExtension(serialization::UpdateWorkOrder::insert_destination_index, relocation_destination_index_);
- proto->SetExtension(serialization::UpdateWorkOrder::predicate_index, predicate_index_);
- proto->SetExtension(serialization::UpdateWorkOrder::update_group_index, update_group_index_);
- proto->SetExtension(serialization::UpdateWorkOrder::block_id, input_block_id);
+ proto->SetExtension(serialization::UpdateWorkOrder::operator_index, op_index_);
+ proto->SetExtension(serialization::UpdateWorkOrder::relation_id, relation_.getID());
+ proto->SetExtension(serialization::UpdateWorkOrder::insert_destination_index, relocation_destination_index_);
+ proto->SetExtension(serialization::UpdateWorkOrder::predicate_index, predicate_index_);
+ proto->SetExtension(serialization::UpdateWorkOrder::update_group_index, update_group_index_);
+ proto->SetExtension(serialization::UpdateWorkOrder::block_id, input_block_id);
+ proto->SetExtension(serialization::UpdateWorkOrder::partition_id, part_id);
- container->addWorkOrderProto(proto, op_index_);
+ container->addWorkOrderProto(proto, op_index_);
+ }
}
started_ = true;
return true;
@@ -120,6 +127,7 @@
proto.set_block_id(input_block_id_);
proto.set_relation_id(relation_.getID());
proto.set_query_id(query_id_);
+ proto.set_partition_id(partition_id_);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp
index 8e020d8..fd2096b 100644
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@ -28,6 +28,7 @@
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "relational_operators/RelationalOperator.hpp"
@@ -87,13 +88,24 @@
const QueryContext::insert_destination_id relocation_destination_index,
const QueryContext::predicate_id predicate_index,
const QueryContext::update_group_id update_group_index)
- : RelationalOperator(query_id),
+ : RelationalOperator(query_id, relation.getNumPartitions()),
relation_(relation),
relocation_destination_index_(relocation_destination_index),
predicate_index_(predicate_index),
update_group_index_(update_group_index),
- input_blocks_(relation.getBlocksSnapshot()),
- started_(false) {}
+ input_blocks_(num_partitions_),
+ started_(false) {
+ if (relation.hasPartitionScheme()) {
+ const PartitionScheme &part_scheme = *relation.getPartitionScheme();
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ input_blocks_[part_id] = part_scheme.getBlocksInPartition(part_id);
+ }
+ } else {
+ DCHECK_EQ(1u, num_partitions_);
+ input_blocks_[0] = relation.getBlocksSnapshot();
+ }
+ }
~UpdateOperator() override {}
@@ -127,7 +139,7 @@
const QueryContext::predicate_id predicate_index_;
const QueryContext::update_group_id update_group_index_;
- const std::vector<block_id> input_blocks_;
+ std::vector<std::vector<block_id>> input_blocks_;
bool started_;
@@ -144,6 +156,7 @@
*
* @param query_id The ID of the query to which this WorkOrder belongs.
* @param relation The relation to perform the UPDATE over.
+ * @param part_id The partition id.
* @param predicate All tuples matching \c predicate will be updated (or NULL
* to update all tuples).
* @param assignments The assignments (the map of attribute_ids to Scalars)
@@ -161,6 +174,7 @@
UpdateWorkOrder(
const std::size_t query_id,
const CatalogRelationSchema &relation,
+ const partition_id part_id,
const block_id input_block_id,
const Predicate *predicate,
const std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>
@@ -170,7 +184,7 @@
const std::size_t update_operator_index,
const tmb::client_id scheduler_client_id,
MessageBus *bus)
- : WorkOrder(query_id),
+ : WorkOrder(query_id, part_id),
relation_(relation),
input_block_id_(input_block_id),
predicate_(predicate),
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 42a0e7d..6dafbe0 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -116,6 +116,7 @@
optional int32 relation_id = 97;
optional int32 predicate_index = 98;
optional fixed64 block_id = 99;
+ optional uint64 partition_id = 100;
}
}
@@ -235,6 +236,7 @@
// All required.
optional fixed64 block_id = 224;
optional bool force = 225;
+ optional uint64 partition_id = 226;
}
}
@@ -320,6 +322,7 @@
optional int32 predicate_index = 323;
optional uint32 update_group_index = 324;
optional fixed64 block_id = 325;
+ optional uint64 partition_id = 326;
}
}
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 68f7f55..5baa21b 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -170,11 +170,16 @@
proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index), query_context));
}
case serialization::DELETE: {
- LOG(INFO) << "Creating DeleteWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
+ const partition_id part_id =
+ proto.GetExtension(serialization::DeleteWorkOrder::partition_id);
+
+ LOG(INFO) << "Creating DeleteWorkOrder (Partition " << part_id << ") for Query " << query_id
+ << " in Shiftboss " << shiftboss_index;
return new DeleteWorkOrder(
query_id,
catalog_database->getRelationSchemaById(
proto.GetExtension(serialization::DeleteWorkOrder::relation_id)),
+ part_id,
proto.GetExtension(serialization::DeleteWorkOrder::block_id),
query_context->getPredicate(
proto.GetExtension(serialization::DeleteWorkOrder::predicate_index)),
@@ -436,10 +441,14 @@
storage_manager);
}
case serialization::SAVE_BLOCKS: {
- LOG(INFO) << "Creating SaveBlocksWorkOrder for Query " << query_id
+ const partition_id part_id =
+ proto.GetExtension(serialization::SaveBlocksWorkOrder::partition_id);
+
+ LOG(INFO) << "Creating SaveBlocksWorkOrder (Partition " << part_id << ") for Query " << query_id
<< " in Shiftboss " << shiftboss_index;
return new SaveBlocksWorkOrder(
query_id,
+ part_id,
proto.GetExtension(serialization::SaveBlocksWorkOrder::block_id),
proto.GetExtension(serialization::SaveBlocksWorkOrder::force),
storage_manager);
@@ -563,11 +572,16 @@
storage_manager);
}
case serialization::UPDATE: {
- LOG(INFO) << "Creating UpdateWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
+ const partition_id part_id =
+ proto.GetExtension(serialization::UpdateWorkOrder::partition_id);
+
+ LOG(INFO) << "Creating UpdateWorkOrder (Partition " << part_id << ") for Query " << query_id
+ << " in Shiftboss " << shiftboss_index;
return new UpdateWorkOrder(
query_id,
catalog_database->getRelationSchemaById(
proto.GetExtension(serialization::UpdateWorkOrder::relation_id)),
+ part_id,
proto.GetExtension(serialization::UpdateWorkOrder::block_id),
query_context->getPredicate(
proto.GetExtension(serialization::UpdateWorkOrder::predicate_index)),
@@ -600,7 +614,7 @@
default:
LOG(FATAL) << "Unknown WorkOrder Type in WorkOrderFactory::ReconstructFromProto in Shiftboss" << shiftboss_index;
}
-}
+} // NOLINT(readability/fn_size)
bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
const CatalogDatabaseLite &catalog_database,
@@ -726,7 +740,8 @@
query_context.isValidPredicate(
proto.GetExtension(serialization::DeleteWorkOrder::predicate_index)) &&
proto.HasExtension(serialization::DeleteWorkOrder::block_id) &&
- proto.HasExtension(serialization::DeleteWorkOrder::operator_index);
+ proto.HasExtension(serialization::DeleteWorkOrder::operator_index) &&
+ proto.GetExtension(serialization::DeleteWorkOrder::partition_id);
}
case serialization::DESTROY_AGGREGATION_STATE: {
return proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index) &&
@@ -885,7 +900,8 @@
}
case serialization::SAVE_BLOCKS: {
return proto.HasExtension(serialization::SaveBlocksWorkOrder::block_id) &&
- proto.HasExtension(serialization::SaveBlocksWorkOrder::force);
+ proto.HasExtension(serialization::SaveBlocksWorkOrder::force) &&
+ proto.HasExtension(serialization::SaveBlocksWorkOrder::partition_id);
}
case serialization::SELECT: {
if (!proto.HasExtension(serialization::SelectWorkOrder::relation_id) ||
@@ -1016,7 +1032,8 @@
query_context.isValidUpdateGroupId(
proto.GetExtension(serialization::UpdateWorkOrder::update_group_index)) &&
proto.HasExtension(serialization::UpdateWorkOrder::operator_index) &&
- proto.HasExtension(serialization::UpdateWorkOrder::block_id);
+ proto.HasExtension(serialization::UpdateWorkOrder::block_id) &&
+ proto.GetExtension(serialization::UpdateWorkOrder::partition_id);
}
case serialization::WINDOW_AGGREGATION: {
return proto.HasExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index) &&