Optimize RowStore to PackedRowStore bulk insert
diff --git a/storage/PackedRowStoreTupleStorageSubBlock.cpp b/storage/PackedRowStoreTupleStorageSubBlock.cpp
index ef83a29..ba747d3 100644
--- a/storage/PackedRowStoreTupleStorageSubBlock.cpp
+++ b/storage/PackedRowStoreTupleStorageSubBlock.cpp
@@ -204,68 +204,257 @@
   return header_->num_tuples - original_num_tuples;
 }
 
-tuple_id PackedRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributes(
+// Unnamed namespace for helper functions that are implementation details and
+// need not be exposed in the interface of the class (i.e., in the *.hpp file).
+// 
+// The first helper function here is used to provide an optimized bulk insertion path
+// from RowStore to RowStore blocks, where contiguous attributes are copied
+// together. For uniformity, there is another helper function that provides
+// semantically identical `runs` for other layouts as well.
+namespace {
+enum RunType {
+  kContiguousAttributes,
+  kNullableAttribute,
+};
+
+struct RunInfo {
+  RunType run_type_;
+  attribute_id mapped_attr_id_;
+  std::size_t size_;
+  int nullable_attr_idx_;
+
+  static const int invalid_nullable_attr_idx_ = -1;
+
+  RunInfo(RunType run_type, attribute_id mapped_attr_id, std::size_t size,
+          int nullable_attr_idx)
+      : run_type_(run_type),
+        mapped_attr_id_(mapped_attr_id),
+        size_(size),
+        nullable_attr_idx_(nullable_attr_idx) {}
+
+  static RunInfo getContiguousAttrsRun(attribute_id mapped_attr_id,
+                               std::size_t size) {
+    return RunInfo(kContiguousAttributes, mapped_attr_id,
+                   size, invalid_nullable_attr_idx_);
+  }
+
+  static RunInfo getNullableAttrRun(attribute_id mapped_attr_id, std::size_t size,
+                            int nullable_attr_idx) {
+    return RunInfo(kNullableAttribute, mapped_attr_id,
+                   size, nullable_attr_idx);
+  }
+
+};
+
+// This helper function examines the schema of the input and output blocks
+// and determines runs of attributes that can be copied at once.
+// 
+// For the i-th run of contiguous non-nullable attributes in the attribute map,
+// `runs[i]` will contain a RunInfo struct.
+//   run_start is the starting (mapped) attribute id of the run, and
+//   size is the total size (i.e., num bytes) of all attributes in the run.
+//
+// For 4B integer attrs, with attribute_map {0,2,5,6,7,3,4,10}
+// `runs` will be {(0,4), (2,4), (5,12), (3,8), (10,4)}
+//
+// Nullable attributes also break runs, just like non-contiguous attributes.
+// They are recorded in `runs` using a sentinel run size kNullAttrRunSize (-1).
+// In the above example, if 6 was nullable,
+// `runs` will be {(0,4), (2,4), (5,4), (6,-1), (7,4), (3,8), (10,4)}
+void getRunsForRowLayoutHelper(
+    const CatalogRelationSchema &relation,
     const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor) {
+    const std::vector<std::size_t> &attrs_max_size,
+    std::vector<RunInfo> &runs) {
+  std::size_t num_attrs = attribute_map.size();
+  std::size_t curr_run_size = 0;
+  std::size_t curr_run_start = 0;
+  for (std::size_t curr_attr = 0; curr_attr < num_attrs; ++curr_attr) {
+    int null_idx = relation.getNullableAttributeIndex(curr_attr);
+    if (null_idx == kInvalidCatalogId) {
+      if (curr_run_size != 0) {
+        if (attribute_map[curr_attr] == 1 + attribute_map[curr_attr - 1]) {
+          // If curr_attr is non-nullable,
+          // ... and if there is an ongoing run,
+          // ... and if curr_attr is adjacent to the previous attribute,
+          // ... then we continue the current run.
+          curr_run_size += attrs_max_size[curr_attr];
+        } else {
+          // If curr_attr is non-nullable
+          // ... and there is an ongoing run,
+          // ... but curr_attr is not adjacent to the previous attribute,
+          // ... then we end the current run,
+          runs.push_back(RunInfo::getContiguousAttrsRun(
+              attribute_map[curr_run_start], curr_run_size));
+
+          // ... and begin a new run.
+          curr_run_size = attrs_max_size[curr_attr];
+          curr_run_start = curr_attr;
+        }
+      } else {
+        // If curr_attr is non-nullable, ...
+        // ... but there is no ongoing run,
+        // ... then we begin a new run.
+        curr_run_size = attrs_max_size[curr_attr];
+        curr_run_start = curr_attr;
+      }
+    } else {
+      if (curr_run_size != 0) {
+        // If curr_attr is nullable,
+        // ... and there is an ongoing run,
+        // ... then we end the current run,
+        runs.push_back(RunInfo::getContiguousAttrsRun(
+            attribute_map[curr_run_start], curr_run_size));
+        curr_run_size = 0;
+
+        // ... and record nulability of curr_attr as below.
+      } else {
+        // If curr_attr is nullable,
+        // ... and there is no ongoing run,
+        // ... we just have to record the nullability of curr_attr as below.
+        // (Nothing to do here.)
+      }
+
+      // We must record that curr_attr is nullable, by inserting it into `runs`
+      runs.push_back(RunInfo::getNullableAttrRun(
+          attribute_map[curr_attr], attrs_max_size[curr_attr], null_idx));
+    }
+  } // end for-loop on attributes in attribute_map
+
+  if (curr_run_size != 0) {
+    // If there is an ongoing run, then we end it. 
+    runs.push_back(RunInfo::getContiguousAttrsRun(
+        attribute_map[curr_run_start], curr_run_size));
+  }
+}
+
+// (See comments for above function for context.)
+// For other layouts (not Row Store), the input attributes may not be contiguous,
+// even if they have successive attribute IDs. So we just create a dummy `runs`
+// vector, with every attribute being a run of its size. Again, as above,
+// nullable attributes are given a sentinel run size kNullAttrRunSize.
+void getRunsForOtherLayoutHelper(
+    const CatalogRelationSchema &relation,
+    const std::vector<attribute_id> &attribute_map,
+    const std::vector<std::size_t> &attrs_max_size,
+    std::vector<RunInfo> &runs) {
+  std::size_t num_attrs = attribute_map.size();
+  for (std::size_t curr_attr = 0; curr_attr < num_attrs; ++curr_attr) {
+    int null_idx = relation.getNullableAttributeIndex(curr_attr);
+    if (null_idx == kInvalidCatalogId) {
+      runs.push_back(RunInfo::getContiguousAttrsRun(
+          attribute_map[curr_attr], attrs_max_size[curr_attr]));
+    } else {
+      runs.push_back(RunInfo::getNullableAttrRun(
+          attribute_map[curr_attr], attrs_max_size[curr_attr], null_idx));
+    }
+  }
+
+}
+} // end Unnamed Namespace
+
+template <bool nullable_attrs> tuple_id
+PackedRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributesHelper(
+    const std::vector<attribute_id> &attribute_map, ValueAccessor *accessor) {
   DEBUG_ASSERT(attribute_map.size() == relation_.size());
 
   const tuple_id original_num_tuples = header_->num_tuples;
-  char *dest_addr = static_cast<char*>(tuple_storage_)
-                      + header_->num_tuples * relation_.getFixedByteLength();
+  char *dest_addr = static_cast<char *>(tuple_storage_) +
+                    header_->num_tuples * relation_.getFixedByteLength();
   const unsigned num_nullable_attrs = relation_.numNullableAttributes();
+  const std::vector<std::size_t> &attrs_max_size =
+      relation_.getMaximumAttributeByteLengths();
+
+  std::vector<RunInfo> runs;
+  runs.reserve(attribute_map.size());
+
+  // We have an optimized implementation for RowStore to RowStore bulk insert.
+  // For other layouts, we just create dummy `runs` for uniformity.
+  auto impl = accessor->getImplementationType();
+  if (impl == ValueAccessor::Implementation::kPackedRowStore ||
+      impl == ValueAccessor::Implementation::kSplitRowStore) {
+    getRunsForRowLayoutHelper(relation_, attribute_map, attrs_max_size, runs);
+  } else {
+    getRunsForOtherLayoutHelper(relation_, attribute_map, attrs_max_size, runs);
+  }
 
   InvokeOnAnyValueAccessor(
-      accessor,
-      [this, &num_nullable_attrs, &attribute_map, &dest_addr](auto *accessor) -> void {  // NOLINT(build/c++11)
-    const std::size_t num_attrs = relation_.size();
-    const std::vector<std::size_t> &attrs_max_size =
-        relation_.getMaximumAttributeByteLengths();
+    accessor,
+    [this, &num_nullable_attrs, &dest_addr, &runs, &attrs_max_size]
+    (auto *accessor) -> void {  // NOLINT(build/c++11)
 
-    if (num_nullable_attrs != 0) {
-      while (this->hasSpaceToInsert<true>(1) && accessor->next()) {
-        for (std::size_t curr_attr = 0; curr_attr < num_attrs; ++curr_attr) {
-          const std::size_t attr_size = attrs_max_size[curr_attr];
-          const attribute_id nullable_idx = relation_.getNullableAttributeIndex(curr_attr);
-          // If this attribute is nullable, check for a returned null value.
-          if (nullable_idx != kInvalidCatalogId) {
-            const void *attr_value
-                = accessor->template getUntypedValue<true>(attribute_map[curr_attr]);
-            if (attr_value == nullptr) {
-              null_bitmap_->setBit(
-                  header_->num_tuples * num_nullable_attrs + nullable_idx,
-                  true);
-            } else {
-              memcpy(dest_addr, attr_value, attr_size);
-            }
-          } else {
+      // Inner lambda inserts one tuple.
+      // Defining a lambda here reduces code duplication. It can't be defined
+      // as a separate function because getUntypedValue is only a member of
+      // the downcasted ValueAccessor subtypes, not the
+      // base ValueAccessor type. So the inner lambda has to be nested inside
+      // the outer lambda. 
+      auto insertOneTupleUsingRuns =
+          [this, &accessor, &runs, &dest_addr, num_nullable_attrs]() -> void {
+        for (auto &run : runs) {
+          if (!nullable_attrs
+              || run.run_type_ == RunType::kContiguousAttributes) {
             memcpy(dest_addr,
-                   accessor->template getUntypedValue<false>(attribute_map[curr_attr]),
-                   attr_size);
+                   accessor->template getUntypedValue<false>(run.mapped_attr_id_),
+                   run.size_);
+            dest_addr += run.size_;
+          } else {
+            // It's a nullable attribute. Does it have null value?
+            const void *attr_value =
+                accessor->template getUntypedValue<true>(run.mapped_attr_id_);
+            if (attr_value != nullptr) {
+              memcpy(dest_addr, attr_value, run.size_);
+            } else {
+              this->null_bitmap_->setBit(
+                  this->header_->num_tuples * num_nullable_attrs
+                      + run.nullable_attr_idx_,
+                  true);
+            }
+            // In either case, increment dest_addr (leaving blank space in
+            // case of null values)
+            dest_addr += run.size_;
           }
-          dest_addr += attr_size;
         }
-        ++(header_->num_tuples);
+        ++(this->header_->num_tuples);
+      }; // end (inner) lambda: insertOneTupleUsingRuns
+
+      // Insert one tuple at a time into this subblock using the lambda fn above.
+      // We have split the loop in two and inserted in batches in order to make
+      // fewer calls to the somewhat expensive function hasSpaceToInsert.
+      //
+      // Note the order of the test conditions in the inner for-loop here. The
+      // loop terminates either when i == kNumTuplesInBatch, in which case
+      // accessor position has not been incremented past the last insert,
+      // or when accessor->next() returns false, in which case all tuples have
+      // been inserted. So there is no possibility of missing a tuple between the
+      // two loops.
+      const unsigned kNumTuplesInBatch = 1000;
+      while (this->hasSpaceToInsert<nullable_attrs>(kNumTuplesInBatch)
+             && !accessor->iterationFinished()) {
+        for (unsigned i = 0; i < kNumTuplesInBatch && accessor->next(); ++i)
+          insertOneTupleUsingRuns();
       }
-    } else {
-      while (this->hasSpaceToInsert<false>(1) && accessor->next()) {
-        for (std::size_t curr_attr = 0; curr_attr < num_attrs; ++curr_attr) {
-          const std::size_t attr_size = attrs_max_size[curr_attr];
-          memcpy(dest_addr,
-                 accessor->template getUntypedValue<false>(attribute_map[curr_attr]),
-                 attr_size);
-          dest_addr += attr_size;
-        }
-        ++(header_->num_tuples);
-      }
-    }
-  });
+      while (this->hasSpaceToInsert<nullable_attrs>(1) && accessor->next())
+        insertOneTupleUsingRuns();
+    });  // End (outer) lambda: argument to InvokeOnAnyValueAccessor
 
   return header_->num_tuples - original_num_tuples;
 }
 
-const void* PackedRowStoreTupleStorageSubBlock::getAttributeValue(
-    const tuple_id tuple,
-    const attribute_id attr) const {
+tuple_id
+PackedRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributes(
+    const std::vector<attribute_id> &attribute_map, ValueAccessor *accessor) {
+  const unsigned num_nullable_attrs = relation_.numNullableAttributes();
+  if (num_nullable_attrs > 0)
+    return bulkInsertTuplesWithRemappedAttributesHelper<true>(
+        attribute_map, accessor);
+  
+  return bulkInsertTuplesWithRemappedAttributesHelper<false>(
+        attribute_map, accessor);
+}
+
+const void *PackedRowStoreTupleStorageSubBlock::getAttributeValue(
+    const tuple_id tuple, const attribute_id attr) const {
   DEBUG_ASSERT(hasTupleWithID(tuple));
   DEBUG_ASSERT(relation_.hasAttributeWithId(attr));
 
diff --git a/storage/PackedRowStoreTupleStorageSubBlock.hpp b/storage/PackedRowStoreTupleStorageSubBlock.hpp
index 9a208dc..302e294 100644
--- a/storage/PackedRowStoreTupleStorageSubBlock.hpp
+++ b/storage/PackedRowStoreTupleStorageSubBlock.hpp
@@ -198,6 +198,13 @@
   // relations that have no nullable attributes (and therefore no NULL-bitmap).
   template <bool nullable_attrs>
   bool hasSpaceToInsert(const tuple_id num_tuples) const;
+  
+  // Helper function for bulkInsertTuplesWithRemappedAttributes which is
+  // templated on the 'nullable_attrs' flag (similar in semantics to its use in
+  // the above functions). 
+  template <bool nullable_attrs>
+  tuple_id bulkInsertTuplesWithRemappedAttributesHelper(
+      const std::vector<attribute_id> &attribute_map, ValueAccessor *accessor);
 
   PackedRowStoreHeader *header_;
   std::unique_ptr<BitVector<false>> null_bitmap_;