Fix number of work orders generated for insert multiple tuples. (Also added unit tests)
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 7876821..e65f096 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -489,6 +489,22 @@
   }
 
   /**
+   * @brief Whether the given vector of Tuple ids is valid.
+   *
+   * @param ids The vector of Tuple ids.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool areValidTupleIds(const std::vector<tuple_id> &ids) const {
+    for (const tuple_id id : ids) {
+      if (id >= tuples_.size()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
    * @brief Release the ownership of the Tuple referenced by the id.
    *
    * @note Each id should use only once.
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 14d8949..b0d3c48 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1461,72 +1461,75 @@
       *catalog_database_->getRelationById(
           input_relation_info->relation->getID());
 
-  for (const std::vector<expressions::ScalarLiteralPtr> &tuple : physical_plan->column_values()) {
-    // Construct the tuple proto to be inserted.
-    const QueryContext::tuple_id tuple_index = query_context_proto_->tuples_size();
 
+  // Construct the tuple proto to be inserted.
+  std::vector<QueryContext::tuple_id> tuple_indexes;
+
+  for (const std::vector<expressions::ScalarLiteralPtr> &tuple : physical_plan->column_values()) {
+    const QueryContext::tuple_id tuple_index = query_context_proto_->tuples_size();
     S::Tuple *tuple_proto = query_context_proto_->add_tuples();
     for (const E::ScalarLiteralPtr &literal : tuple) {
       tuple_proto->add_attribute_values()->CopyFrom(literal->value().getProto());
     }
-
-    // FIXME(qzeng): A better way is using a traits struct to look up whether a storage
-    //               block supports ad-hoc insertion instead of hard-coding the block types.
-    const StorageBlockLayout &storage_block_layout =
-        input_relation.getDefaultStorageBlockLayout();
-    if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
-        TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE ||
-        storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
-              TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) {
-      THROW_SQL_ERROR() << "INSERT statement is not supported for the relation "
-                        << input_relation.getName()
-                        << ", because its storage blocks do not support ad-hoc insertion";
-    }
-
-    // Create InsertDestination proto.
-    const QueryContext::insert_destination_id insert_destination_index =
-        query_context_proto_->insert_destinations_size();
-    S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
-
-    insert_destination_proto->set_relation_id(input_relation.getID());
-    insert_destination_proto->mutable_layout()->MergeFrom(
-        input_relation.getDefaultStorageBlockLayout().getDescription());
-
-    if (input_relation.hasPartitionScheme()) {
-      insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
-      insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
-          ->MergeFrom(input_relation.getPartitionScheme()->getProto());
-    } else {
-      insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
-
-      const vector<block_id> blocks(input_relation.getBlocksSnapshot());
-      for (const block_id block : blocks) {
-        insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block);
-      }
-    }
-
-    const QueryPlan::DAGNodeIndex insert_operator_index =
-        execution_plan_->addRelationalOperator(
-            new InsertOperator(query_handle_->query_id(),
-                               input_relation,
-                               insert_destination_index,
-                               tuple_index));
-    insert_destination_proto->set_relational_op_index(insert_operator_index);
-
-    CatalogRelation *mutable_relation =
-        catalog_database_->getRelationByIdMutable(input_relation.getID());
-    const QueryPlan::DAGNodeIndex save_blocks_index =
-        execution_plan_->addRelationalOperator(
-            new SaveBlocksOperator(query_handle_->query_id(), mutable_relation));
-    if (!input_relation_info->isStoredRelation()) {
-      execution_plan_->addDirectDependency(insert_operator_index,
-                                           input_relation_info->producer_operator_index,
-                                           true /* is_pipeline_breaker */);
-    }
-    execution_plan_->addDirectDependency(save_blocks_index,
-                                         insert_operator_index,
-                                         false /* is_pipeline_breaker */);
+    tuple_indexes.push_back(tuple_index);
   }
+
+  // FIXME(qzeng): A better way is using a traits struct to look up whether a storage
+  //               block supports ad-hoc insertion instead of hard-coding the block types.
+  const StorageBlockLayout &storage_block_layout =
+      input_relation.getDefaultStorageBlockLayout();
+  if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
+      TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE ||
+      storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
+            TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) {
+    THROW_SQL_ERROR() << "INSERT statement is not supported for the relation "
+                      << input_relation.getName()
+                      << ", because its storage blocks do not support ad-hoc insertion";
+  }
+
+  // Create InsertDestination proto.
+  const QueryContext::insert_destination_id insert_destination_index =
+      query_context_proto_->insert_destinations_size();
+  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+
+  insert_destination_proto->set_relation_id(input_relation.getID());
+  insert_destination_proto->mutable_layout()->MergeFrom(
+      input_relation.getDefaultStorageBlockLayout().getDescription());
+
+  if (input_relation.hasPartitionScheme()) {
+    insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
+    insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
+        ->MergeFrom(input_relation.getPartitionScheme()->getProto());
+  } else {
+    insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
+
+    const vector<block_id> blocks(input_relation.getBlocksSnapshot());
+    for (const block_id block : blocks) {
+      insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block);
+    }
+  }
+
+  const QueryPlan::DAGNodeIndex insert_operator_index =
+      execution_plan_->addRelationalOperator(
+          new InsertOperator(query_handle_->query_id(),
+                             input_relation,
+                             insert_destination_index,
+                             tuple_indexes));
+  insert_destination_proto->set_relational_op_index(insert_operator_index);
+
+  CatalogRelation *mutable_relation =
+      catalog_database_->getRelationByIdMutable(input_relation.getID());
+  const QueryPlan::DAGNodeIndex save_blocks_index =
+      execution_plan_->addRelationalOperator(
+          new SaveBlocksOperator(query_handle_->query_id(), mutable_relation));
+  if (!input_relation_info->isStoredRelation()) {
+    execution_plan_->addDirectDependency(insert_operator_index,
+                                         input_relation_info->producer_operator_index,
+                                         true /* is_pipeline_breaker */);
+  }
+  execution_plan_->addDirectDependency(save_blocks_index,
+                                       insert_operator_index,
+                                       false /* is_pipeline_breaker */);
 }
 
 void ExecutionGenerator::convertInsertSelection(
diff --git a/query_optimizer/tests/execution_generator/Insert.test b/query_optimizer/tests/execution_generator/Insert.test
index 1be7be9..8131fb2 100644
--- a/query_optimizer/tests/execution_generator/Insert.test
+++ b/query_optimizer/tests/execution_generator/Insert.test
@@ -132,3 +132,18 @@
 |          5|                      2016-01-01T00:00:00|                    NULL|                             abc|
 +-----------+-----------------------------------------+------------------------+--------------------------------+
 ==
+
+CREATE TABLE bar5 (x INT NULL, y INT);
+
+INSERT INTO bar5 VALUES (1,2),(3,4),(5,6);
+
+SELECT * FROM bar5;
+--
++-----------+-----------+
+|x          |y          |
++-----------+-----------+
+|          1|          2|
+|          3|          4|
+|          5|          6|
++-----------+-----------+
+==
diff --git a/query_optimizer/tests/resolver/Insert.test b/query_optimizer/tests/resolver/Insert.test
index 88fff53..f21ce5c 100644
--- a/query_optimizer/tests/resolver/Insert.test
+++ b/query_optimizer/tests/resolver/Insert.test
@@ -153,3 +153,32 @@
 ERROR: Unrecognized relation undefined_table (1 : 13)
 insert into undefined_table values (1, 2)
             ^
+==
+
+insert into test values (null, 1, 2, 3, 'foo', 'foo'),(null, 4, 5, 6, 'foo', 'foo');
+--
+TopLevelPlan
++-plan=InsertTuple
+| +-input=TableReference[relation_name=Test,relation_alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-column_values=
+| | +-Literal[value=NULL,type=Int NULL]
+| | +-Literal[value=1,type=Long]
+| | +-Literal[value=2,type=Float]
+| | +-Literal[value=3,type=Double NULL]
+| | +-Literal[value=foo,type=Char(20)]
+| | +-Literal[value=foo,type=VarChar(20) NULL]
+| +-column_values=
+|   +-Literal[value=NULL,type=Int NULL]
+|   +-Literal[value=4,type=Long]
+|   +-Literal[value=5,type=Float]
+|   +-Literal[value=6,type=Double NULL]
+|   +-Literal[value=foo,type=Char(20)]
+|   +-Literal[value=foo,type=VarChar(20) NULL]
++-output_attributes=
+  +-[]
diff --git a/relational_operators/InsertOperator.cpp b/relational_operators/InsertOperator.cpp
index fbd3a07..b8c9f07 100644
--- a/relational_operators/InsertOperator.cpp
+++ b/relational_operators/InsertOperator.cpp
@@ -20,6 +20,7 @@
 #include "relational_operators/InsertOperator.hpp"
 
 #include <memory>
+#include <vector>
 
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
@@ -43,12 +44,19 @@
     return true;
   }
 
+  std::vector<std::unique_ptr<Tuple>> tuples;
+
+  for (const QueryContext::tuple_id tuple_index : tuple_indexes_) {
+    std::unique_ptr<Tuple> newTuple(query_context->releaseTuple(tuple_index));
+    tuples.push_back(std::move(newTuple));
+  }
+
   DCHECK(query_context != nullptr);
   container->addNormalWorkOrder(
       new InsertWorkOrder(
           query_id_,
           query_context->getInsertDestination(output_destination_index_),
-          query_context->releaseTuple(tuple_index_)),
+          std::move(tuples)),
       op_index_);
 
   work_generated_ = true;
@@ -64,7 +72,9 @@
   proto->set_work_order_type(serialization::INSERT);
   proto->set_query_id(query_id_);
   proto->SetExtension(serialization::InsertWorkOrder::insert_destination_index, output_destination_index_);
-  proto->SetExtension(serialization::InsertWorkOrder::tuple_index, tuple_index_);
+  for (const QueryContext::tuple_id tuple_index : tuple_indexes_) {
+    proto->AddExtension(serialization::InsertWorkOrder::tuple_indexes, tuple_index);
+  }
 
   container->addWorkOrderProto(proto, op_index_);
 
@@ -74,7 +84,9 @@
 
 
 void InsertWorkOrder::execute() {
-  output_destination_->insertTuple(*tuple_);
+  for (const auto &tuple : tuples_) {
+    output_destination_->insertTuple(*tuple);
+  }
 }
 
 }  // namespace quickstep
diff --git a/relational_operators/InsertOperator.hpp b/relational_operators/InsertOperator.hpp
index b103538..3865a7f 100644
--- a/relational_operators/InsertOperator.hpp
+++ b/relational_operators/InsertOperator.hpp
@@ -23,6 +23,7 @@
 #include <cstddef>
 #include <string>
 #include <memory>
+#include <vector>
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
@@ -67,11 +68,11 @@
       const std::size_t query_id,
       const CatalogRelation &output_relation,
       const QueryContext::insert_destination_id output_destination_index,
-      const QueryContext::tuple_id tuple_index)
+      const std::vector<QueryContext::tuple_id> &tuple_indexes)
       : RelationalOperator(query_id, 1u, false, output_relation.getNumPartitions()),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
-        tuple_index_(tuple_index),
+        tuple_indexes_(tuple_indexes),
         work_generated_(false) {}
 
   ~InsertOperator() override {}
@@ -103,7 +104,7 @@
  private:
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;
-  const QueryContext::tuple_id tuple_index_;
+  const std::vector<QueryContext::tuple_id> tuple_indexes_;
   bool work_generated_;
 
   DISALLOW_COPY_AND_ASSIGN(InsertOperator);
@@ -125,10 +126,10 @@
    **/
   InsertWorkOrder(const std::size_t query_id,
                   InsertDestination *output_destination,
-                  Tuple *tuple)
+                  std::vector<std::unique_ptr<Tuple>> &&tuples)
       : WorkOrder(query_id),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        tuple_(DCHECK_NOTNULL(tuple)) {}
+        tuples_(std::move(tuples)) {}
 
   ~InsertWorkOrder() override {}
 
@@ -140,7 +141,7 @@
 
  private:
   InsertDestination *output_destination_;
-  std::unique_ptr<Tuple> tuple_;
+  std::vector<std::unique_ptr<Tuple>> tuples_;
 
   DISALLOW_COPY_AND_ASSIGN(InsertWorkOrder);
 };
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index aaf7929..b84e758 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -201,7 +201,7 @@
   extend WorkOrder {
     // All required.
     optional int32 insert_destination_index = 176;
-    optional uint32 tuple_index = 177;
+    repeated uint32 tuple_indexes = 177;
   }
 }
 
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 3a991bd..7f11e3e 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -395,12 +395,22 @@
     }
     case serialization::INSERT: {
       LOG(INFO) << "Creating InsertWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
+
+      const int tuple_count = proto.ExtensionSize(serialization::InsertWorkOrder::tuple_indexes);
+      std::vector<std::unique_ptr<Tuple>> tuple_indexes;
+
+      for (int specific_tuple_index = 0; specific_tuple_index < tuple_count; specific_tuple_index++) {
+        const int tuple_index =
+            proto.GetExtension(serialization::InsertWorkOrder::tuple_indexes, specific_tuple_index);
+        tuple_indexes.emplace_back(
+            std::unique_ptr<Tuple>(query_context->releaseTuple(tuple_index)));
+      }
+
       return new InsertWorkOrder(
           query_id,
           query_context->getInsertDestination(
               proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)),
-          query_context->releaseTuple(
-              proto.GetExtension(serialization::InsertWorkOrder::tuple_index)));
+              std::move(tuple_indexes));
     }
     case serialization::NESTED_LOOP_JOIN: {
       const partition_id part_id =
@@ -852,12 +862,20 @@
              proto.HasExtension(serialization::InitializeAggregationWorkOrder::state_partition_id);
     }
     case serialization::INSERT: {
+      const int tuple_count = proto.ExtensionSize(serialization::InsertWorkOrder::tuple_indexes);
+      std::vector<QueryContext::tuple_id> tuple_indexes;
+
+      for (int specific_tuple_index = 0; specific_tuple_index < tuple_count; specific_tuple_index++) {
+        const int tuple_index =
+            proto.GetExtension(serialization::InsertWorkOrder::tuple_indexes, specific_tuple_index);
+        tuple_indexes.push_back(tuple_index);
+      }
+
       return proto.HasExtension(serialization::InsertWorkOrder::insert_destination_index) &&
              query_context.isValidInsertDestinationId(
                  proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)) &&
-             proto.HasExtension(serialization::InsertWorkOrder::tuple_index) &&
-             query_context.isValidTupleId(
-                 proto.GetExtension(serialization::InsertWorkOrder::tuple_index));
+             proto.HasExtension(serialization::InsertWorkOrder::tuple_indexes) &&
+             query_context.areValidTupleIds(tuple_indexes);
     }
     case serialization::NESTED_LOOP_JOIN: {
       if (!proto.HasExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id) ||