Fix number of work orders generated for insert multiple tuples. (Also added unit tests)
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 7876821..e65f096 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -489,6 +489,22 @@
}
/**
+ * @brief Whether the given vector of Tuple ids is valid.
+ *
+ * @param ids The vector of Tuple ids.
+ *
+ * @return True if valid, otherwise false.
+ **/
+ bool areValidTupleIds(const std::vector<tuple_id> &ids) const {
+ for (const tuple_id id : ids) {
+ if (id >= tuples_.size()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
* @brief Release the ownership of the Tuple referenced by the id.
*
* @note Each id should use only once.
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 14d8949..b0d3c48 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1461,72 +1461,75 @@
*catalog_database_->getRelationById(
input_relation_info->relation->getID());
- for (const std::vector<expressions::ScalarLiteralPtr> &tuple : physical_plan->column_values()) {
- // Construct the tuple proto to be inserted.
- const QueryContext::tuple_id tuple_index = query_context_proto_->tuples_size();
+ // Construct the tuple proto to be inserted.
+ std::vector<QueryContext::tuple_id> tuple_indexes;
+
+ for (const std::vector<expressions::ScalarLiteralPtr> &tuple : physical_plan->column_values()) {
+ const QueryContext::tuple_id tuple_index = query_context_proto_->tuples_size();
S::Tuple *tuple_proto = query_context_proto_->add_tuples();
for (const E::ScalarLiteralPtr &literal : tuple) {
tuple_proto->add_attribute_values()->CopyFrom(literal->value().getProto());
}
-
- // FIXME(qzeng): A better way is using a traits struct to look up whether a storage
- // block supports ad-hoc insertion instead of hard-coding the block types.
- const StorageBlockLayout &storage_block_layout =
- input_relation.getDefaultStorageBlockLayout();
- if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
- TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE ||
- storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
- TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) {
- THROW_SQL_ERROR() << "INSERT statement is not supported for the relation "
- << input_relation.getName()
- << ", because its storage blocks do not support ad-hoc insertion";
- }
-
- // Create InsertDestination proto.
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto_->insert_destinations_size();
- S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
-
- insert_destination_proto->set_relation_id(input_relation.getID());
- insert_destination_proto->mutable_layout()->MergeFrom(
- input_relation.getDefaultStorageBlockLayout().getDescription());
-
- if (input_relation.hasPartitionScheme()) {
- insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
- insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
- ->MergeFrom(input_relation.getPartitionScheme()->getProto());
- } else {
- insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
-
- const vector<block_id> blocks(input_relation.getBlocksSnapshot());
- for (const block_id block : blocks) {
- insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block);
- }
- }
-
- const QueryPlan::DAGNodeIndex insert_operator_index =
- execution_plan_->addRelationalOperator(
- new InsertOperator(query_handle_->query_id(),
- input_relation,
- insert_destination_index,
- tuple_index));
- insert_destination_proto->set_relational_op_index(insert_operator_index);
-
- CatalogRelation *mutable_relation =
- catalog_database_->getRelationByIdMutable(input_relation.getID());
- const QueryPlan::DAGNodeIndex save_blocks_index =
- execution_plan_->addRelationalOperator(
- new SaveBlocksOperator(query_handle_->query_id(), mutable_relation));
- if (!input_relation_info->isStoredRelation()) {
- execution_plan_->addDirectDependency(insert_operator_index,
- input_relation_info->producer_operator_index,
- true /* is_pipeline_breaker */);
- }
- execution_plan_->addDirectDependency(save_blocks_index,
- insert_operator_index,
- false /* is_pipeline_breaker */);
+ tuple_indexes.push_back(tuple_index);
}
+
+ // FIXME(qzeng): A better way is using a traits struct to look up whether a storage
+ // block supports ad-hoc insertion instead of hard-coding the block types.
+ const StorageBlockLayout &storage_block_layout =
+ input_relation.getDefaultStorageBlockLayout();
+ if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
+ TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE ||
+ storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
+ TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) {
+ THROW_SQL_ERROR() << "INSERT statement is not supported for the relation "
+ << input_relation.getName()
+ << ", because its storage blocks do not support ad-hoc insertion";
+ }
+
+ // Create InsertDestination proto.
+ const QueryContext::insert_destination_id insert_destination_index =
+ query_context_proto_->insert_destinations_size();
+ S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+
+ insert_destination_proto->set_relation_id(input_relation.getID());
+ insert_destination_proto->mutable_layout()->MergeFrom(
+ input_relation.getDefaultStorageBlockLayout().getDescription());
+
+ if (input_relation.hasPartitionScheme()) {
+ insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
+ insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
+ ->MergeFrom(input_relation.getPartitionScheme()->getProto());
+ } else {
+ insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
+
+ const vector<block_id> blocks(input_relation.getBlocksSnapshot());
+ for (const block_id block : blocks) {
+ insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block);
+ }
+ }
+
+ const QueryPlan::DAGNodeIndex insert_operator_index =
+ execution_plan_->addRelationalOperator(
+ new InsertOperator(query_handle_->query_id(),
+ input_relation,
+ insert_destination_index,
+ tuple_indexes));
+ insert_destination_proto->set_relational_op_index(insert_operator_index);
+
+ CatalogRelation *mutable_relation =
+ catalog_database_->getRelationByIdMutable(input_relation.getID());
+ const QueryPlan::DAGNodeIndex save_blocks_index =
+ execution_plan_->addRelationalOperator(
+ new SaveBlocksOperator(query_handle_->query_id(), mutable_relation));
+ if (!input_relation_info->isStoredRelation()) {
+ execution_plan_->addDirectDependency(insert_operator_index,
+ input_relation_info->producer_operator_index,
+ true /* is_pipeline_breaker */);
+ }
+ execution_plan_->addDirectDependency(save_blocks_index,
+ insert_operator_index,
+ false /* is_pipeline_breaker */);
}
void ExecutionGenerator::convertInsertSelection(
diff --git a/query_optimizer/tests/execution_generator/Insert.test b/query_optimizer/tests/execution_generator/Insert.test
index 1be7be9..8131fb2 100644
--- a/query_optimizer/tests/execution_generator/Insert.test
+++ b/query_optimizer/tests/execution_generator/Insert.test
@@ -132,3 +132,18 @@
| 5| 2016-01-01T00:00:00| NULL| abc|
+-----------+-----------------------------------------+------------------------+--------------------------------+
==
+
+CREATE TABLE bar5 (x INT NULL, y INT);
+
+INSERT INTO bar5 VALUES (1,2),(3,4),(5,6);
+
+SELECT * FROM bar5;
+--
++-----------+-----------+
+|x |y |
++-----------+-----------+
+| 1| 2|
+| 3| 4|
+| 5| 6|
++-----------+-----------+
+==
diff --git a/query_optimizer/tests/resolver/Insert.test b/query_optimizer/tests/resolver/Insert.test
index 88fff53..f21ce5c 100644
--- a/query_optimizer/tests/resolver/Insert.test
+++ b/query_optimizer/tests/resolver/Insert.test
@@ -153,3 +153,32 @@
ERROR: Unrecognized relation undefined_table (1 : 13)
insert into undefined_table values (1, 2)
^
+==
+
+insert into test values (null, 1, 2, 3, 'foo', 'foo'),(null, 4, 5, 6, 'foo', 'foo');
+--
+TopLevelPlan
++-plan=InsertTuple
+| +-input=TableReference[relation_name=Test,relation_alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-column_values=
+| | +-Literal[value=NULL,type=Int NULL]
+| | +-Literal[value=1,type=Long]
+| | +-Literal[value=2,type=Float]
+| | +-Literal[value=3,type=Double NULL]
+| | +-Literal[value=foo,type=Char(20)]
+| | +-Literal[value=foo,type=VarChar(20) NULL]
+| +-column_values=
+| +-Literal[value=NULL,type=Int NULL]
+| +-Literal[value=4,type=Long]
+| +-Literal[value=5,type=Float]
+| +-Literal[value=6,type=Double NULL]
+| +-Literal[value=foo,type=Char(20)]
+| +-Literal[value=foo,type=VarChar(20) NULL]
++-output_attributes=
+ +-[]
diff --git a/relational_operators/InsertOperator.cpp b/relational_operators/InsertOperator.cpp
index fbd3a07..b8c9f07 100644
--- a/relational_operators/InsertOperator.cpp
+++ b/relational_operators/InsertOperator.cpp
@@ -20,6 +20,7 @@
#include "relational_operators/InsertOperator.hpp"
#include <memory>
+#include <vector>
#include "query_execution/QueryContext.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
@@ -43,12 +44,19 @@
return true;
}
+ std::vector<std::unique_ptr<Tuple>> tuples;
+
+ for (const QueryContext::tuple_id tuple_index : tuple_indexes_) {
+ std::unique_ptr<Tuple> newTuple(query_context->releaseTuple(tuple_index));
+ tuples.push_back(std::move(newTuple));
+ }
+
DCHECK(query_context != nullptr);
container->addNormalWorkOrder(
new InsertWorkOrder(
query_id_,
query_context->getInsertDestination(output_destination_index_),
- query_context->releaseTuple(tuple_index_)),
+ std::move(tuples)),
op_index_);
work_generated_ = true;
@@ -64,7 +72,9 @@
proto->set_work_order_type(serialization::INSERT);
proto->set_query_id(query_id_);
proto->SetExtension(serialization::InsertWorkOrder::insert_destination_index, output_destination_index_);
- proto->SetExtension(serialization::InsertWorkOrder::tuple_index, tuple_index_);
+ for (const QueryContext::tuple_id tuple_index : tuple_indexes_) {
+ proto->AddExtension(serialization::InsertWorkOrder::tuple_indexes, tuple_index);
+ }
container->addWorkOrderProto(proto, op_index_);
@@ -74,7 +84,9 @@
void InsertWorkOrder::execute() {
- output_destination_->insertTuple(*tuple_);
+ for (const auto &tuple : tuples_) {
+ output_destination_->insertTuple(*tuple);
+ }
}
} // namespace quickstep
diff --git a/relational_operators/InsertOperator.hpp b/relational_operators/InsertOperator.hpp
index b103538..3865a7f 100644
--- a/relational_operators/InsertOperator.hpp
+++ b/relational_operators/InsertOperator.hpp
@@ -23,6 +23,7 @@
#include <cstddef>
#include <string>
#include <memory>
+#include <vector>
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
@@ -67,11 +68,11 @@
const std::size_t query_id,
const CatalogRelation &output_relation,
const QueryContext::insert_destination_id output_destination_index,
- const QueryContext::tuple_id tuple_index)
+ const std::vector<QueryContext::tuple_id> &tuple_indexes)
: RelationalOperator(query_id, 1u, false, output_relation.getNumPartitions()),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
- tuple_index_(tuple_index),
+ tuple_indexes_(tuple_indexes),
work_generated_(false) {}
~InsertOperator() override {}
@@ -103,7 +104,7 @@
private:
const CatalogRelation &output_relation_;
const QueryContext::insert_destination_id output_destination_index_;
- const QueryContext::tuple_id tuple_index_;
+ const std::vector<QueryContext::tuple_id> tuple_indexes_;
bool work_generated_;
DISALLOW_COPY_AND_ASSIGN(InsertOperator);
@@ -125,10 +126,10 @@
**/
InsertWorkOrder(const std::size_t query_id,
InsertDestination *output_destination,
- Tuple *tuple)
+ std::vector<std::unique_ptr<Tuple>> &&tuples)
: WorkOrder(query_id),
output_destination_(DCHECK_NOTNULL(output_destination)),
- tuple_(DCHECK_NOTNULL(tuple)) {}
+ tuples_(std::move(tuples)) {}
~InsertWorkOrder() override {}
@@ -140,7 +141,7 @@
private:
InsertDestination *output_destination_;
- std::unique_ptr<Tuple> tuple_;
+ std::vector<std::unique_ptr<Tuple>> tuples_;
DISALLOW_COPY_AND_ASSIGN(InsertWorkOrder);
};
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index aaf7929..b84e758 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -201,7 +201,7 @@
extend WorkOrder {
// All required.
optional int32 insert_destination_index = 176;
- optional uint32 tuple_index = 177;
+ repeated uint32 tuple_indexes = 177;
}
}
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 3a991bd..7f11e3e 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -395,12 +395,22 @@
}
case serialization::INSERT: {
LOG(INFO) << "Creating InsertWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
+
+ const int tuple_count = proto.ExtensionSize(serialization::InsertWorkOrder::tuple_indexes);
+ std::vector<std::unique_ptr<Tuple>> tuple_indexes;
+
+ for (int specific_tuple_index = 0; specific_tuple_index < tuple_count; specific_tuple_index++) {
+ const int tuple_index =
+ proto.GetExtension(serialization::InsertWorkOrder::tuple_indexes, specific_tuple_index);
+ tuple_indexes.emplace_back(
+ std::unique_ptr<Tuple>(query_context->releaseTuple(tuple_index)));
+ }
+
return new InsertWorkOrder(
query_id,
query_context->getInsertDestination(
proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)),
- query_context->releaseTuple(
- proto.GetExtension(serialization::InsertWorkOrder::tuple_index)));
+ std::move(tuple_indexes));
}
case serialization::NESTED_LOOP_JOIN: {
const partition_id part_id =
@@ -852,12 +862,20 @@
proto.HasExtension(serialization::InitializeAggregationWorkOrder::state_partition_id);
}
case serialization::INSERT: {
+ const int tuple_count = proto.ExtensionSize(serialization::InsertWorkOrder::tuple_indexes);
+ std::vector<QueryContext::tuple_id> tuple_indexes;
+
+ for (int specific_tuple_index = 0; specific_tuple_index < tuple_count; specific_tuple_index++) {
+ const int tuple_index =
+ proto.GetExtension(serialization::InsertWorkOrder::tuple_indexes, specific_tuple_index);
+ tuple_indexes.push_back(tuple_index);
+ }
+
return proto.HasExtension(serialization::InsertWorkOrder::insert_destination_index) &&
query_context.isValidInsertDestinationId(
proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)) &&
- proto.HasExtension(serialization::InsertWorkOrder::tuple_index) &&
- query_context.isValidTupleId(
- proto.GetExtension(serialization::InsertWorkOrder::tuple_index));
+ proto.HasExtension(serialization::InsertWorkOrder::tuple_indexes) &&
+ query_context.areValidTupleIds(tuple_indexes);
}
case serialization::NESTED_LOOP_JOIN: {
if (!proto.HasExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id) ||