tablet: allow interleaving of row liveness between compaction input rows

It was previously true that when merging multiple rowsets, a row's
liveness always moved to the same or newer rowsets. I.e., if a row was
deleted and inserted again, the new insert would either land in the same
rowset from which it was deleted (e.g. if deleting from the MRS and
inserting back into the same MRS), or a rowset whose rows are newer than
the rowset being deleted from (e.g. if deleting from a DRS and inserting
into the MRS).

This invariant was upheld by various checks in compaction code. The
invariant is no longer true when considering transactional inserts.
Take the following example:
- ts=10: insert 'a' to the tablet's MRS
- ts=11: delete 'a' from the tablet's MRS
- begin txn
  - insert 'a' to a transactional MRS
  - ts=12: commit the transactional MRS
- ts=13: delete 'a' from the transactional MRS
- ts=14: insert 'a' to the tablet's MRS

In considering the row history for 'a' in the context of defining the
row's history, we're left with the following row histories in each
rowset, which serve as an input to our history-merging code:

  tablet MRS: UNDO(del@10) <- 'a' -> REDO(del@11) -> REDO(reins@14)
  txn MRS:    UNDO(del@12) <- 'a' -> REDO(del@13)

Previously, our invariant meant that one of these rows entire history
(both UNDOs and REDOs) was entirely ahead of the other. This meant that
merging was a matter of determining which input is older (which must be
a deleted "ghost" row, since there can only be a single live row at a
time), converting the older row and its history into UNDOs, and merging
that UNDO history with the newer row's UNDOs. Given the above sequence,
defining an older row isn't as straightforward, given the overlapping of
the histories. This led to broken invariants in the form of CHECK
failures or incorrect results.

However, a notable obvservation is that there _is_ a discernable history
that does abide by our previously held invariant, if we transfer the
newer REDOs from the tablet's MRS input to the txn's MRS input:

  UNDO(del@10) <- 'a' -> REDO(del@11)
  UNDO(del@12) <- 'a' -> REDO(del@13) -> REDO(reins@14)

This transferral is safe because we still expect that only a single
instance of a row can be "live" at a time. I.e., if there is such
overlap, it is caused by the deletion of a row from the older rowset,
and in transferring the history, there is no risk of ending up with two
histories for the same row that both end with a live row.

This patch implements this transferral of history onto the newer input,
and adds some fuzz test cases that helped snuff out the aforementioned
CHECK failures and incorrect results.

It also enables transactional ops in fuzz-itest, which helped find this
issue. Without this patch, fuzz-itest with transactional support fails
2/10 times in DEBUG mode. With it, it has passed 1000/1000 times.

Change-Id: I042a7d70d32edf9d2a3a077790821893f162880a
Reviewed-on: http://gerrit.cloudera.org:8080/16752
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
Tested-by: Andrew Wong <awong@cloudera.com>
diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index 01c0b40..499454f 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -259,8 +259,6 @@
   optional<int32_t> val;
 };
 
-// TODO(awong): Merging multiple transactional MRSs together can sometimes lead
-// to a crash. Uncomment the transactional ops once fixed.
 const vector<TestOpType> kAllOps {TEST_INSERT,
                                   TEST_INSERT_PK_ONLY,
                                   TEST_INSERT_IGNORE,
@@ -279,15 +277,13 @@
                                   TEST_COMPACT_TABLET,
                                   TEST_RESTART_TS,
                                   TEST_SCAN_AT_TIMESTAMP,
-                                  TEST_DIFF_SCAN};
-                                  // TEST_BEGIN_TXN,
-                                  // TEST_COMMIT_TXN,
-                                  // TEST_ABORT_TXN};
+                                  TEST_DIFF_SCAN,
+                                  TEST_BEGIN_TXN,
+                                  TEST_COMMIT_TXN,
+                                  TEST_ABORT_TXN};
 
 // Ops that focus on hammering workloads in which rows come in and out of
 // existence.
-// TODO(awong): Merging multiple transactional MRSs together can sometimes lead
-// to a crash. Uncomment the transactional ops once fixed.
 const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY,
                                      TEST_INSERT_IGNORE_PK_ONLY,
                                      TEST_UPSERT_PK_ONLY,
@@ -301,10 +297,10 @@
                                      TEST_COMPACT_TABLET,
                                      TEST_RESTART_TS,
                                      TEST_SCAN_AT_TIMESTAMP,
-                                     TEST_DIFF_SCAN};
-                                     // TEST_BEGIN_TXN,
-                                     // TEST_COMMIT_TXN,
-                                     // TEST_ABORT_TXN};
+                                     TEST_DIFF_SCAN,
+                                     TEST_BEGIN_TXN,
+                                     TEST_COMMIT_TXN,
+                                     TEST_ABORT_TXN};
 
 // Test which does only random operations against a tablet, including update and random
 // get (ie scans with equal lower and upper bounds).
@@ -534,12 +530,12 @@
           }
           ExpectedKeyValueRow found_val = rows_found[found_idx++];
           if (expected_val.key != found_val.key) {
-            errors->push_back(Substitute("Mismached key. Expected: $0 Found: $1",
+            errors->push_back(Substitute("Mismatched key. Expected: $0 Found: $1",
                                          expected_val.ToString(), found_val.ToString()));
             continue;
           }
           if (expected_val.val != found_val.val) {
-            errors->push_back(Substitute("Mismached value. Expected: $0 Found: $1",
+            errors->push_back(Substitute("Mismatched value. Expected: $0 Found: $1",
                                          expected_val.ToString(), found_val.ToString()));
             continue;
           }
@@ -1809,19 +1805,23 @@
 TEST_F(FuzzTest, TestReplayDeletesOnTxnRowsets) {
   CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
   RunFuzzCase({
+      // Insert to the main MRS.
       {TEST_INSERT_PK_ONLY, 1, -1},
       {TEST_FLUSH_OPS, -1},
       {TEST_FLUSH_TABLET},
 
+      // Insert into a transactional MRS.
       {TEST_BEGIN_TXN, 2},
       {TEST_INSERT_IGNORE_PK_ONLY, 0, 2},
       {TEST_FLUSH_OPS, 2},
       {TEST_COMMIT_TXN, 2},
 
+      // Delete the rows in both MRSs.
       {TEST_DELETE, 0},
       {TEST_DELETE, 1},
       {TEST_FLUSH_OPS, -1},
 
+      // We should be able to restart without issues.
       {TEST_FLUSH_DELTAS},
       {TEST_RESTART_TS},
     });
@@ -1830,11 +1830,13 @@
 TEST_F(FuzzTest, TestFlushMRSsWithInvisibleRows) {
   CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
   RunFuzzCase({
+    // Insert into a transactional MRS.
     {TEST_BEGIN_TXN, 0},
     {TEST_INSERT_IGNORE, 1, 0},
     {TEST_FLUSH_OPS, 0},
     {TEST_COMMIT_TXN, 0},
 
+    // Insert into the main MRS and it in the same batch.
     {TEST_INSERT_PK_ONLY, 0, -1},
     {TEST_INSERT_IGNORE_PK_ONLY, 0, -1},
     {TEST_DELETE, 0},
@@ -1843,8 +1845,45 @@
     {TEST_RESTART_TS},
     {TEST_MAJOR_COMPACT_DELTAS},
 
+    // Delete the row in the transactional MRS.
     {TEST_DELETE, 1},
     {TEST_FLUSH_OPS, -1},
+
+    // Flush the tablet, merging the MRSs, one of which has an invisible row.
+    {TEST_FLUSH_TABLET},
+  });
+}
+
+// Test that when the newer row's redo head and older row's redo tail have the
+// same timestamp, that we transfer the newest non-deletes onto the newer row,
+// since if a row is alive, it must be alive as the newer row.
+TEST_F(FuzzTest, TestOlderRowsetGetsReinsertWhenNewerGetsDeleteInSameOp) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+    // Insert and delete a row from the main MRS.
+    {TEST_INSERT, 1},
+    {TEST_FLUSH_OPS, -1},
+    {TEST_DELETE, 1},
+    {TEST_FLUSH_OPS, -1},
+
+    // Insert into a transactional MRS.
+    {TEST_BEGIN_TXN, 25},
+    {TEST_INSERT_IGNORE, 1, 25},
+    {TEST_FLUSH_OPS, 25},
+    {TEST_COMMIT_TXN, 25},
+
+    // Delete from the transactional MRS and insert back to the main MRS.
+    {TEST_DELETE, 1},
+    {TEST_FLUSH_OPS, -1},
+    {TEST_INSERT, 1},
+    {TEST_FLUSH_OPS, -1},
+
+    // Merging the history of this row, the inputs are:
+    // tablet MRS:  UNDO(del@t1) <- BASE -> REDO(del@t2) -> REDO(reins@t4)
+    // txn MRS:     UNDO(del@t3) <- BASE -> REDO(del@t4)
+    //
+    // A contiguous history should be generated by transferring the last REDO
+    // in the tablet MRS onto the txn MRS.
     {TEST_FLUSH_TABLET},
   });
 }
@@ -1869,6 +1908,174 @@
   });
 }
 
+// Test that when a strictly older rowset whose row has been deleted gets
+// reinserted to after already inserting and deleting from a newer rowset.
+TEST_F(FuzzTest, TestOlderRowsetGetsNewerInsertAndDelete) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+    // Insert, delete, and insert again into the main MRS.
+    {TEST_INSERT, 1},
+    {TEST_INSERT, 0},
+    {TEST_FLUSH_OPS, -1},
+
+    {TEST_UPDATE, 0},
+    {TEST_FLUSH_OPS, -1},
+
+    // Delete both rows from the main MRS.
+    {TEST_DELETE, 0},
+    {TEST_FLUSH_OPS, -1},
+
+    // Insert into a transactional MRS.
+    {TEST_BEGIN_TXN, 25},
+    {TEST_INSERT_IGNORE_PK_ONLY, 0, 25},
+    {TEST_FLUSH_OPS, 25},
+    {TEST_COMMIT_TXN, 25},
+
+    // Delete from the transactional MRS and insert to the main MRS.
+    {TEST_DELETE, 0},
+    {TEST_INSERT, 0},
+    {TEST_UPDATE, 0},
+    {TEST_DELETE, 0},
+    {TEST_FLUSH_OPS, -1},
+
+    // Merging the history of this row, the inputs are:
+    // tablet MRS: UNDO(del@t1) <- BASE -> REDO(upd@t2) -> REDO(del@t3) -> REDO(reins@t5)
+    //                                      -> REDO(upd@t5) -> REDO(del@t5)
+    // txn MRS:    UNDO(del@t4) <- BASE -> REDO(del@t5)
+    // where: t1=5, t2=7, t3=9, t4=13, t4=19
+    {TEST_FLUSH_TABLET},
+
+    // Verify correctness by scanning at the boundaries of the mutations.
+    {TEST_SCAN_AT_TIMESTAMP, 5},
+    {TEST_SCAN_AT_TIMESTAMP, 6},
+    {TEST_SCAN_AT_TIMESTAMP, 7},
+    {TEST_SCAN_AT_TIMESTAMP, 8},
+    {TEST_SCAN_AT_TIMESTAMP, 9},
+    {TEST_SCAN_AT_TIMESTAMP, 10},
+    {TEST_SCAN_AT_TIMESTAMP, 13},
+    {TEST_SCAN_AT_TIMESTAMP, 14},
+    {TEST_SCAN_AT_TIMESTAMP, 19},
+    {TEST_SCAN_AT_TIMESTAMP, 20},
+  });
+}
+
+// Test when the REDO histories are tangential to one another but don't
+// overlap.
+TEST_F(FuzzTest, TestMergeTangentialNonOverlappingRedoHistory) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+    {TEST_BEGIN_TXN, 25},
+    {TEST_INSERT, 1, 25},
+    {TEST_FLUSH_OPS, 25},
+    {TEST_COMMIT_TXN, 25},
+
+    // Have some REDOs land in the transactional MRS.
+    {TEST_UPDATE, 1},
+    {TEST_FLUSH_OPS, -1},
+
+    {TEST_UPDATE, 1},
+    {TEST_DELETE, 1},
+
+    // Have another insert land in the main MRS, and give it an update at the
+    // same timestamp.
+    {TEST_INSERT, 1},
+    {TEST_DELETE, 1},
+    {TEST_INSERT, 1},
+    {TEST_FLUSH_OPS, -1},
+
+    // Finally, delete the row at another timestamp.
+    {TEST_DELETE, 1},
+    {TEST_FLUSH_OPS, -1},
+
+    // Merging the history of this row, the inputs are:
+    // txn MRS:    UNDO(del@t1) <- BASE -> REDO(upd@t2) -> REDO(upd@t3) -> REDO(del@t3)
+    // tablet MRS: UNDO(del@t3) <- BASE -> REDO(del@t3) -> REDO(reins@t3) -> REDO(del@t4)
+    {TEST_FLUSH_TABLET},
+  });
+}
+
+// Test that when we delete from a transactional MRS and insert to the main MRS
+// at the same time, the generated row history is correct.
+TEST_F(FuzzTest, TestRedoHistoryInterleavingRowsets) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+    {TEST_INSERT, 0},
+    {TEST_BEGIN_TXN, 25},
+    {TEST_INSERT_IGNORE, 1, 25},
+    {TEST_FLUSH_OPS, 25},
+    {TEST_COMMIT_TXN, 25},
+
+    // Have some REDOs land in the transactional MRS.
+    {TEST_UPDATE, 1},
+    {TEST_DELETE, 1},
+
+    // Have another insert land in the main MRS, and give it an update at the
+    // same timestamp.
+    {TEST_INSERT, 1},
+    {TEST_UPDATE, 1},
+    {TEST_FLUSH_OPS, -1},
+
+    // Finally, delete the row at another timestamp.
+    {TEST_DELETE, 1},
+    {TEST_FLUSH_OPS, -1},
+
+    // We should end up with two compaction inputs:
+    // txn MRS:     UNDO(del@t1) <- BASE -> REDO(upd@t2) -> REDO(del@t2)
+    // tablet MRS:  UNDO(del@t2) <- BASE -> REDO(upd@t2) -> REDO(del@t3)
+    // where t1 < t2 < t3
+    //
+    // These should flush to:
+    // UNDO(del@t1) <- UNDO(upd@t2) <- UNDO(reins@t2) \
+    //     <- UNDO(del@t2) <- UNDO(upd@t2) <- BASE -> REDO(del@t3)
+    //
+    // Where: t1 = 6, t2 = 12, t3 = 14
+    {TEST_FLUSH_TABLET},
+
+    // Run scans to validate we have the correct values in each range.
+    {TEST_SCAN_AT_TIMESTAMP, 6},
+    {TEST_SCAN_AT_TIMESTAMP, 7},
+    {TEST_SCAN_AT_TIMESTAMP, 12},
+    {TEST_SCAN_AT_TIMESTAMP, 13},
+    {TEST_SCAN_AT_TIMESTAMP, 14},
+    {TEST_SCAN_AT_TIMESTAMP, 15},
+  });
+}
+
+TEST_F(FuzzTest, TestDontTransferUpdateWithSameTimestampAsDelete) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+    {TEST_INSERT, 0},
+    {TEST_BEGIN_TXN, 25},
+    {TEST_INSERT, 1, 25},
+    {TEST_FLUSH_OPS, 25},
+    {TEST_COMMIT_TXN, 25},
+
+    // Have a REDO land in the txn metadata to disambiguate which rowset input
+    // is older.
+    {TEST_UPDATE, 1},
+    {TEST_FLUSH_OPS, -1},
+
+    // Now delete the row and insert to a different rowset (the main MRS). Give
+    // it an update to make it somewhat ambiguous whether there's overlap,
+    // since both input rows end at the same timestamp.
+    {TEST_DELETE, 1},
+    {TEST_INSERT, 1},
+    {TEST_UPDATE, 1},
+    {TEST_FLUSH_OPS, -1},
+
+    // We should end up with two compaction input rows:
+    // txn MRS:     UNDO(del@t1) <- BASE -> REDO(upd@t2) -> REDO(del@t3)
+    // tablet MRS:  UNDO(del@t3) <- BASE -> REDO(upd@t3)
+    // where t1 < t2 < t3
+    //
+    // A flush should correctly determine that we don't need to transfer the
+    // REDO(upd@t3) to the txn MRS input row, which would be catastrophic since
+    // we'd be left with a delete followed by an update with no reinsert in the
+    // txn MRS's input row.
+    {TEST_FLUSH_TABLET},
+  });
+}
+
 } // namespace tablet
 } // namespace kudu
 
diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index aeb9570..4ccba4c 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -27,6 +27,7 @@
 #include <random>
 #include <string>
 #include <thread>
+#include <unordered_set>
 #include <vector>
 
 #include <gflags/gflags.h>
@@ -51,6 +52,7 @@
 #include "kudu/fs/fs_manager.h"
 #include "kudu/fs/log_block_manager.h"
 #include "kudu/gutil/casts.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/diskrowset.h"
@@ -70,6 +72,7 @@
 #include "kudu/util/faststring.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
@@ -93,6 +96,7 @@
 using std::string;
 using std::thread;
 using std::unique_ptr;
+using std::unordered_set;
 using std::vector;
 
 namespace kudu {
@@ -143,6 +147,26 @@
     op.FinishApplying();
   }
 
+  void DeleteAndInsertRow(MemRowSet* mrs_to_delete, MemRowSet* mrs_to_insert,
+                          int row_key, int32_t val, bool also_update) {
+    ScopedOp op(&mvcc_, clock_.Now());
+    op.StartApplying();
+    DeleteRowInOp(mrs_to_delete, op, row_key);
+    InsertRowInOp(mrs_to_insert, op, row_key, val);
+    if (also_update) {
+      UpdateRowInOp(mrs_to_insert, op, row_key, val);
+    }
+    op.FinishApplying();
+  }
+
+  void InsertAndDeleteRow(MemRowSet* mrs, int row_key, int32_t val) {
+    ScopedOp op(&mvcc_, clock_.Now());
+    op.StartApplying();
+    InsertRowInOp(mrs, op, row_key, val);
+    DeleteRowInOp(mrs, op, row_key);
+    op.FinishApplying();
+  }
+
   void BuildRow(int row_key, int32_t val) {
     row_builder_.Reset();
     snprintf(key_buf_, sizeof(key_buf_), kRowKeyFormat, row_key);
@@ -378,11 +402,7 @@
     shared_ptr<MemRowSet> mrs;
     CHECK_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
                                mem_trackers_.tablet_tracker, &mrs));
-    ScopedOp op(&mvcc_, clock_.Now());
-    op.StartApplying();
-    InsertRowInOp(mrs.get(), op, 0, 0);
-    DeleteRowInOp(mrs.get(), op, 0);
-    op.FinishApplying();
+    InsertAndDeleteRow(mrs.get(), 0, 0);
     return mrs;
   }
 
@@ -1104,6 +1124,132 @@
   ASSERT_EQ(10, CountRows(result_rs));
 }
 
+TEST_F(TestCompaction, TestRandomizeDuplicatedRowsAcrossTransactions) {
+  ThreadSafeRandom prng(SeedRandom());
+  constexpr int kMaxIters = 32;
+  constexpr int kMinIters = 2;
+  shared_ptr<MemRowSet> main_mrs;
+  int mrs_id = 0;
+  ASSERT_OK(MemRowSet::Create(mrs_id++, schema_, log_anchor_registry_.get(),
+            mem_trackers_.tablet_tracker, &main_mrs));
+
+  // Keep track of our transactional MRSs. Since we can only mutate a row in
+  // a transactional MRS after committing, we'll treat these MRSs as having
+  // committed immediately after being inserted to.
+  unordered_set<shared_ptr<MemRowSet>> txn_mrss;
+
+  // Keep track of which MRS has the live row, if any.
+  MemRowSet* mrs_with_live_row = nullptr;
+
+  int num_iters = kMinIters + prng.Uniform(kMaxIters - kMinIters);
+  for (int i = 0; i < num_iters; i++) {
+    const auto one_of_three = prng.Next() % 3;
+    if (mrs_with_live_row) {
+      switch (one_of_three) {
+        case 0:
+          LOG(INFO) << Substitute("Deleting row from mrs $0", mrs_with_live_row->mrs_id());
+          NO_FATALS(DeleteRow(mrs_with_live_row, 1));
+          mrs_with_live_row = nullptr;
+          break;
+        case 1:
+          LOG(INFO) << Substitute("Updating row from mrs $0", mrs_with_live_row->mrs_id());
+          NO_FATALS(UpdateRow(mrs_with_live_row, 1, 1337));
+          break;
+        case 2: {
+          // For some added spice, let's also sometimes update the row in the
+          // same op.
+          bool update = prng.Next() % 2;
+          LOG(INFO) << Substitute("Deleting row from mrs $0, inserting $1to mrs $2",
+                                  mrs_with_live_row->mrs_id(), update ? "and updating " : "",
+                                  main_mrs->mrs_id());
+          NO_FATALS(DeleteAndInsertRow(mrs_with_live_row, main_mrs.get(), 1, 0, update));
+          mrs_with_live_row = main_mrs.get();
+          break;
+        }
+      }
+      continue;
+    }
+    switch (one_of_three) {
+      case 0:
+        LOG(INFO) << Substitute("Inserting row into mrs $0", main_mrs->mrs_id());
+        NO_FATALS(InsertRow(main_mrs.get(), 1, 0));
+        mrs_with_live_row = main_mrs.get();
+        break;
+      case 1: {
+        shared_ptr<MemRowSet> txn_mrs;
+        ASSERT_OK(MemRowSet::Create(mrs_id++, schema_, log_anchor_registry_.get(),
+                                    mem_trackers_.tablet_tracker, &txn_mrs));
+        LOG(INFO) << Substitute("Inserting into mrs $0 and committing", txn_mrs->mrs_id());
+        NO_FATALS(InsertRow(txn_mrs.get(), 1, 0));
+        mrs_with_live_row = txn_mrs.get();
+        EmplaceOrDie(&txn_mrss, std::move(txn_mrs));
+        break;
+      }
+      case 2:
+        LOG(INFO) << Substitute("Inserting row into mrs $0 and deleting", main_mrs->mrs_id());
+        NO_FATALS(InsertAndDeleteRow(main_mrs.get(), 1, 0));
+        break;
+    }
+  }
+  MvccSnapshot snap(mvcc_);
+  vector<shared_ptr<CompactionInput>> merge_inputs;
+  merge_inputs.emplace_back(CompactionInput::Create(*main_mrs, &schema_, snap));
+  for (auto& mrs : txn_mrss) {
+    merge_inputs.emplace_back(CompactionInput::Create(*mrs, &schema_, snap));
+  }
+  unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, &schema_));
+  vector<shared_ptr<DiskRowSet>> result_rs;
+  DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs);
+  ASSERT_EQ(1, result_rs.size());
+  ASSERT_EQ(mrs_with_live_row ? 1 : 0, CountRows(result_rs));
+}
+
+// Test that we can merge rowsets in which we have a row whose liveness jumps
+// back and forth between rowsets over time.
+TEST_F(TestCompaction, TestRowHistoryJumpsBetweenRowsets) {
+  shared_ptr<MemRowSet> mrs_a;
+  ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
+                              mem_trackers_.tablet_tracker, &mrs_a));
+  shared_ptr<MemRowSet> mrs_b;
+  ASSERT_OK(MemRowSet::Create(1, schema_, log_anchor_registry_.get(),
+                              mem_trackers_.tablet_tracker, &mrs_b));
+  shared_ptr<MemRowSet> mrs_c;
+  ASSERT_OK(MemRowSet::Create(2, schema_, log_anchor_registry_.get(),
+                              mem_trackers_.tablet_tracker, &mrs_c));
+  // Interleave the history of a row across three MRSs.
+  InsertRows(mrs_a.get(), 1, 0);
+  DeleteRows(mrs_a.get(), 1, 0);
+  InsertRows(mrs_b.get(), 1, 0);
+  DeleteRows(mrs_b.get(), 1, 0);
+  InsertRows(mrs_a.get(), 1, 0);
+  DeleteRows(mrs_a.get(), 1, 0);
+  InsertRows(mrs_c.get(), 1, 0);
+  DeleteRows(mrs_c.get(), 1, 0);
+  InsertRows(mrs_a.get(), 1, 0);
+  DeleteRows(mrs_a.get(), 1, 0);
+
+  // At this point, our compaction input rows look like:
+  // MRS a:
+  // UNDO(del@ts1) <- 0 -> REDO(del@ts2) -> REDO(reins@ts5) -> REDO(del@ts6)
+  //                      -> REDO(reins@ts9) -> REDO(del@ts10)
+  // UNDO(del@ts3) <- 0 -> REDO(del@ts4)
+  // UNDO(del@ts7) <- 0 -> REDO(del@ts8)
+  //
+  // Despite the overlapping time ranges across these inputs, the compaction
+  // should go off without a hitch.
+  MvccSnapshot snap(mvcc_);
+  vector<shared_ptr<CompactionInput> > merge_inputs {
+    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)),
+    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap)),
+    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_c, &schema_, snap)),
+  };
+  unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, &schema_));
+  vector<shared_ptr<DiskRowSet>> result_rs;
+  DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs);
+  ASSERT_EQ(1, result_rs.size());
+  ASSERT_EQ(0, CountRows(result_rs));
+}
+
 // Like the above test, but with all invisible rowsets.
 TEST_F(TestCompaction, TestMergeMRSWithAllInvisibleRows) {
   shared_ptr<MemRowSet> mrs_a = CreateInvisibleMRS();
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 07c7e28..83a08f0 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -99,6 +99,13 @@
 const int kCompactionOutputBlockNumRows = 100;
 
 // Advances to the last mutation in a mutation list.
+void AdvanceToLastInList(Mutation** m) {
+  if (*m == nullptr) return;
+  Mutation* next;
+  while ((next = (*m)->acquire_next()) != nullptr) {
+    *m = next;
+  }
+}
 void AdvanceToLastInList(const Mutation** m) {
   if (*m == nullptr) return;
   const Mutation* next;
@@ -299,70 +306,165 @@
   };
 };
 
-// Compares two duplicate rows before compaction (and before the REDO->UNDO transformation).
-// Returns -1 if 'left' is less recent than 'right', 1 otherwise. Never returns 0.
+// Compares two duplicate rows before compaction (and before the REDO->UNDO
+// transformation). Returns 1 if 'left' is more recent than 'right', -1
+// otherwise. Never returns 0.
+//
+// The resulting order will determine the order in which ghost rows are linked.
+//
+// It's possible that transactional inserts and non-transactional inserts
+// of the same row land in separate rowsets, and that a row's liveness will
+// bounce between the main MRS and a transactional MRS:
+// - Row 'a' is inserted into the main MRS @ 5
+// - Row 'a' is deleted from the main MRS @ 10
+// - Row 'a' is inserted into a transactional MRS, committed @ 15
+// - Row 'a' is deleted from the transactional MRS @ 20
+// - Row 'a' is inserted again into the main MRS @ 25
+//
+// The result of this sequence is that we have two compaction input rows with
+// REDO time ranges whose REDO timestamps overlap, making it less
+// straightforward to determine which is newer:
+//
+//   UNDO(del@5) <- BASE(a) -> REDO(del@10) -> REDO(reins@25)  // main MRS
+//   UNDO(del@15) <- BASE(a) -> REDO(del@20)                   // txn MRS
+//
+// This method selects the input whose REDO head is newer as the newer input.
+// However, callers should detect such cases and update the lists to have
+// non-overlapping time ranges. For example, after the call to PrepareBlock()
+// completes, the merged row should be updated as below, with REDO(reins@25)
+// transferred onto the newer input:
+//
+//   UNDO(del@15) <- BASE(a) -> REDO(del@20) -> REDO(reins@25)
+//                      | previous_ghost
+//                      v
+//   UNDO(del@5) <- BASE(a) -> REDO(del@10)
+//
+// To detect such cases, if set, 'redos_overlap' returns true if there exist
+// REDOs in the older input that are of equal or higher timestamp than the
+// first REDO in the newer input. Callers can then transfer those newer
+// mutations onto the newer input.
+//
+// Note that the transferral can be done by simply appending the newer
+// mutations onto the newer input, and we don't have to worry about mutations
+// interleaving, as in the below bad example:
+//
+//   UNDO(del@5) <- BASE(a) -> REDO(del@10) -> REDO(reins@25)  // main MRS
+//   UNDO(del@15) <- BASE(a) -> REDO(del@20) -> REDO(reins@30) // txn MRS
+//
+// In this bad example, transferral via append would result in:
+//
+//   UNDO(del@5) <- BASE(a) -> REDO(del@10)
+//   UNDO(del@15) <- BASE(a) -> REDO(del@20) -> REDO(reins@30) -> REDO(reins@25)
+//
+// This history would not make sense, since reinserts cannot directly follow
+// one another without a delete separating them. However, the initial input of
+// this example is not possible, as for it to be the case, two rowsets would
+// have to commit their inserts, be deleted from, and then commit reinserts
+// again. However, only the main tablet MRS can be reinserted to after
+// committing inserts -- once a transaction commits, its transactional MRS no
+// longer becomes eligible for inserts. Since only a single rowset in the
+// tablet can be reinserted to, the above scenario cannot happen.
 int CompareDuplicatedRows(const CompactionInputRow& left,
-                          const CompactionInputRow& right) {
+                          const CompactionInputRow& right,
+                          bool* redos_overlap = nullptr) {
   const Mutation* left_last = left.redo_head;
   const Mutation* right_last = right.redo_head;
-  AdvanceToLastInList(&left_last);
-  AdvanceToLastInList(&right_last);
 
   if (left.redo_head == nullptr) {
 #ifndef NDEBUG
     // left must still be alive, meaning right must have at least a DELETE redo.
     if (PREDICT_TRUE(FLAGS_dcheck_on_kudu_2233_invariants)) {
-      DCHECK(right_last != nullptr);
-      DCHECK(right_last->changelist().is_delete());
+      // Only do the validations in DEBUG mode. RELEASE builds should fail the
+      // tablet.
+      AdvanceToLastInList(&right_last);
+      // left must still be alive, meaning right must have at least a DELETE redo.
+      CHECK(right_last != nullptr);
+      CHECK(right_last->changelist().is_delete());
     }
 #endif
+    if (redos_overlap) *redos_overlap = false;
     return 1;
   }
   if (right.redo_head == nullptr) {
 #ifndef NDEBUG
     // right must still be alive, meaning left must have at least a DELETE redo.
     if (PREDICT_TRUE(FLAGS_dcheck_on_kudu_2233_invariants)) {
-      DCHECK(left_last != nullptr);
-      DCHECK(left_last->changelist().is_delete());
+      // Only do the validations in DEBUG mode. RELEASE builds should fail the
+      // tablet.
+      AdvanceToLastInList(&left_last);
+      // right must still be alive, meaning left must have at least a DELETE redo.
+      CHECK(left_last != nullptr);
+      CHECK(left_last->changelist().is_delete());
     }
 #endif
+    if (redos_overlap) *redos_overlap = false;
     return -1;
   }
+  AdvanceToLastInList(&right_last);
+  AdvanceToLastInList(&left_last);
 
-  // Duplicated rows usually have disjoint redo histories, meaning the first mutation
-  // should be enough for the sake of determining the most recent row in most cases.
+  // Duplicated rows usually have disjoint redo histories, meaning the first
+  // mutation should be enough for the sake of determining the most recent row
+  // in most cases.
   int ret = left.redo_head->timestamp().CompareTo(right.redo_head->timestamp());
 
-  if (ret > 0) {
+  if (ret != 0) {
+    if (redos_overlap) {
+      bool left_newer = ret > 0;
+      const Mutation* newer_row_head = left_newer ? left.redo_head : right.redo_head;
+      const Mutation* newer_row_last = left_newer ? left_last: right_last;
+      const Mutation* older_row_last = left_newer ? right_last : left_last;
+      // There is overlap between the rows if the older row's last timestamp >
+      // the newer row's head.
+      int older_last_vs_newer_head =
+          older_row_last->timestamp().CompareTo(newer_row_head->timestamp());
+      *redos_overlap = older_last_vs_newer_head > 0 ||
+          // If the older row's tail has the same timestamp as the newer row's
+          // head, the row must have been deleted, reinserted, and updated
+          // (maybe even deleted again) at the same timestamp. If so, the older
+          // row should end in a delete, and if not, there is overlap.
+          (older_last_vs_newer_head == 0 &&
+           older_row_last->timestamp() >= newer_row_last->timestamp() &&
+           (!older_row_last->changelist().is_delete() ||
+            // Since the older row's last is a delete, it's safe to move
+            // newer's history onto it.
+            newer_row_last->changelist().is_delete()));
+    }
     return ret;
   }
 
-  if (PREDICT_TRUE(ret < 0)) {
-    return ret;
-  }
-
-  // In case the histories aren't disjoint it must be because one row was deleted in the
-  // the same operation that inserted the other one, which was then mutated.
+  // The only way that the redo heads have the same timestamp is if a delete,
+  // insert, and update of the same row are all assigned the same timestamp.
   // For instance this is a valid history with non-disjoint REDO histories:
   // -- Row 'a' lives in DRS1
-  // Update row 'a' @ 10
+  // Update row 'a' @ 10  // This is the first redo for DRS1's input row.
   // Delete row 'a' @ 10
-  // Insert row 'a' @ 10
+  // Insert row 'a' @ 10  // A new input row is added for the MRS.
   // -- Row 'a' lives in the MRS
-  // Update row 'a' @ 10
+  // Update row 'a' @ 10  // This is the first redo for the MRS.
   // -- Flush the MRS into DRS2
   // -- Compact DRS1 and DRS2
-  //
-  // We can't have the case here where both 'left' and 'right' have a DELETE as the last
-  // mutation at the same timestamp. This would be troublesome as we would possibly have
-  // no way to decide which version is the most up-to-date (one or both version's undos
-  // might have been garbage collected). See MemRowSetCompactionInput::PrepareBlock().
-
+  if (redos_overlap) {
+    *redos_overlap = false;
+  }
   // At least one of the rows must have a DELETE REDO as its last redo.
   CHECK(left_last->changelist().is_delete() || right_last->changelist().is_delete());
-  // We made sure that rows that are inserted and deleted in the same operation can never
-  // be part of a compaction input so they can't both be deletes.
-  CHECK(!(left_last->changelist().is_delete() && right_last->changelist().is_delete()));
+  if (left_last->changelist().is_delete() && right_last->changelist().is_delete()) {
+    // We can't have the case here where both 'left' and 'right' have a DELETE
+    // as the last mutation at the same timestamp, e.g. if we also deleted the
+    // row in the MRS in the above example. This would be troublesome as we
+    // would possibly have no way to decide which version is the most
+    // up-to-date (one or both version's undos might have been garbage
+    // collected). See MemRowSetCompactionInput::PrepareBlock().
+    //
+    // If the last changes are both deletes, it must be because we deleted the
+    // row in one compaction input, inserted into the other, and then deleted
+    // the row again. If that's the case, the delete with the higher timestamp
+    // defines the newer input, or
+    int ret = left_last->timestamp().CompareTo(right_last->timestamp());
+    CHECK_NE(0, ret);
+    return ret;
+  }
 
   // If 'left' doesn't have a delete then it's the latest version.
   if (!left_last->changelist().is_delete() && right_last->changelist().is_delete()) {
@@ -388,6 +490,36 @@
   }
 }
 
+void AdvanceWhileNextLessThan(Mutation** head, Timestamp ts) {
+  while ((*head)->next() != nullptr && (*head)->next()->timestamp() < ts) {
+    *head = (*head)->next();
+  }
+}
+
+// Transfers all updates older than the first REDO in 'newer' found in 'older'
+// onto 'newer'.
+void TransferRedoHistory(CompactionInputRow* newer, CompactionInputRow* older) {
+  CHECK(newer->redo_head);
+  CHECK(older->redo_head);
+  DCHECK_GT(newer->redo_head->timestamp(), older->redo_head->timestamp());
+
+  const auto& newer_head_ts = newer->redo_head->timestamp();
+  // Find the first mutation in 'older' that has a higher timestamp than the
+  // REDO head of 'newer'. The linked list starting from that mutation must be
+  // transferred onto 'newer'.
+  Mutation* older_highest_below_ts = older->redo_head;
+  AdvanceWhileNextLessThan(&older_highest_below_ts, newer_head_ts);
+  DCHECK(older_highest_below_ts);
+  auto* transferred_history = older_highest_below_ts->next();
+
+  Mutation* newer_last = newer->redo_head;
+  AdvanceToLastInList(&newer_last);
+  DCHECK(newer_last);
+  newer_last->set_next(transferred_history);
+  older_highest_below_ts->set_next(nullptr);
+}
+
+
 class MergeCompactionInput : public CompactionInput {
  private:
   // State kept for each of the inputs.
@@ -488,7 +620,9 @@
       int smallest_idx = -1;
       CompactionInputRow* smallest = nullptr;
 
-      // Iterate over the inputs to find the one with the smallest next row.
+      // Iterate over the inputs to find the one with the smallest next row,
+      // merging duplicates as ghost rows in decreasing timestamp order.
+      //
       // It may seem like an O(n lg k) merge using a heap would be more efficient,
       // but some benchmarks indicated that the simpler code path of the O(n k) merge
       // actually ends up a bit faster.
@@ -522,16 +656,25 @@
 
         // If we found two rows with the same key, we want to make the newer
         // one point to the older one, which must be a ghost.
+
         if (PREDICT_FALSE(row_comp == 0)) {
           DVLOG(4) << "Duplicate row.\nLeft: " << CompactionInputRowToString(*state->next())
                    << "\nRight: " << CompactionInputRowToString(*smallest);
-          int mutation_comp = CompareDuplicatedRows(*state->next(), *smallest);
+          bool redos_overlap = false;
+          int mutation_comp = CompareDuplicatedRows(*state->next(), *smallest, &redos_overlap);
           CHECK_NE(mutation_comp, 0);
           if (mutation_comp > 0) {
-            // If the previous smallest row has a highest version that is lower
-            // than this one, clone it as the previous version and discard the original.
+            if (redos_overlap) {
+              TransferRedoHistory(state->next(), smallest);
+            }
+            // This input has higher redo timestamps than 'smallest'. Clone
+            // 'smallest' as this input's previous ghost and discard the
+            // original.
             RETURN_NOT_OK(SetPreviousGhost(state->next(), smallest, true /* clone */,
                                            state->input->PreparedBlockArena()));
+
+            // Now that the current 'smallest' has been added to
+            // 'state->next()', iterate forward and set the new smallest value.
             states_[smallest_idx]->pop_front();
             smallest_idx = i;
             smallest = state->next();
@@ -539,6 +682,9 @@
                      << CompactionInputRowToString(*smallest);
             continue;
           }
+          if (redos_overlap) {
+            TransferRedoHistory(smallest, state->next());
+          }
           // .. otherwise copy and pop the other one.
           RETURN_NOT_OK(SetPreviousGhost(smallest, state->next(), true /* clone */,
                                          DCHECK_NOTNULL(smallest)->row.row_block()->arena()));
@@ -679,44 +825,47 @@
   }
 
   // Merges the 'previous_ghost' histories of 'older' and 'newer' such that
-  // 'older->previous_ghost' is the head of a list of rows in increasing
-  // timestamp order (deltas get newer down the list).
+  // 'newer->previous_ghost' is the head of a list of rows in decreasing
+  // timestamp order (deltas get older down the list).
   //
   // 'must_copy' indicates whether there must be a deep copy (using
   // CloneCompactionInputRow()).
-  Status SetPreviousGhost(CompactionInputRow* older,
-                          CompactionInputRow* newer,
+  //
+  // 'arena' is the arena of 'newer', in which the ghosts and mutations will
+  // keep memory.
+  Status SetPreviousGhost(CompactionInputRow* newer,
+                          CompactionInputRow* older,
                           bool must_copy,
                           Arena* arena) {
     CHECK(arena != nullptr) << "Arena can't be null";
-    // Check if we already had a previous version and, if yes, whether 'newer' is more or less
-    // recent.
-    if (older->previous_ghost != nullptr) {
-      if (CompareDuplicatedRows(*older->previous_ghost, *newer) > 0) {
-        // 'older->previous_ghost' was more recent.
-        return SetPreviousGhost(older->previous_ghost, newer, must_copy /* clone */, arena);
+    // Check if 'newer' already had a previous version and, if so, whether
+    // 'older' is more or less recent.
+    if (newer->previous_ghost != nullptr) {
+      if (CompareDuplicatedRows(*newer->previous_ghost, *older) > 0) {
+        // 'newer->previous_ghost' was more recent.
+        return SetPreviousGhost(newer->previous_ghost, older, must_copy /* clone */, arena);
       }
-      // 'newer' was more recent.
+      // 'older' was more recent.
       if (must_copy) {
-        CompactionInputRow* newer_copy;
-        RETURN_NOT_OK(CloneCompactionInputRow(newer, &newer_copy, arena));
-        newer = newer_copy;
+        CompactionInputRow* older_copy;
+        RETURN_NOT_OK(CloneCompactionInputRow(older, &older_copy, arena));
+        older = older_copy;
       }
-      // 'older->previous_ghost' is already in 'arena' so avoid the copy.
-      RETURN_NOT_OK(SetPreviousGhost(newer,
-                                     older->previous_ghost,
+      // 'newer->previous_ghost' is already in 'arena' so avoid the copy.
+      RETURN_NOT_OK(SetPreviousGhost(older,
+                                     newer->previous_ghost,
                                      false /* don't clone */,
                                      arena));
-      older->previous_ghost = newer;
+      newer->previous_ghost = older;
       return Status::OK();
     }
 
     if (must_copy) {
-      CompactionInputRow* newer_copy;
-      RETURN_NOT_OK(CloneCompactionInputRow(newer, &newer_copy, arena));
-      newer = newer_copy;
+      CompactionInputRow* older_copy;
+      RETURN_NOT_OK(CloneCompactionInputRow(older, &older_copy, arena));
+      older = older_copy;
     }
-    older->previous_ghost = newer;
+    newer->previous_ghost = older;
     return Status::OK();
   }
 
@@ -756,8 +905,9 @@
     *head = (*head)->next();
   }
 }
-
-// Merges two undo histories into one with decreasing timestamp order and returns the new head.
+// Merges two undo histories into one with decreasing timestamp order and
+// returns the new head. Assumes that all mutation memory is kept in the same
+// arena and therefore makes no effort to copy memory.
 Mutation* MergeUndoHistories(Mutation* left, Mutation* right) {
 
   if (PREDICT_FALSE(left == nullptr)) {
@@ -809,7 +959,7 @@
   CompactionInputRow* previous_ghost = old_row->previous_ghost;
   while (previous_ghost != nullptr) {
 
-    // First step is to transform the old rows REDO's into undos, if there are any.
+    // First step is to transform the old rows REDO's into UNDOs, if there are any.
     // This simplifies this for several reasons:
     // - will be left with most up-to-date version of the old row
     // - only have one REDO left to deal with (the delete)
@@ -1053,7 +1203,7 @@
        redo_mut != nullptr;
        redo_mut = redo_mut->acquire_next()) {
 
-    // Skip anything not committed.
+    // Skip anything not applied.
     if (!snap.IsApplied(redo_mut->timestamp())) {
       break;
     }
@@ -1235,6 +1385,26 @@
 
       DVLOG(4) << "Output Row: " << dst_row.schema()->DebugRow(dst_row)
                << "; RowId: " << index_in_current_drs;
+#ifndef NDEBUG
+      auto* u = new_undos_head;
+      bool is_deleted = false;
+      // The resulting list should have the following invariants:
+      // - deletes can only be observed if not already deleted
+      // - reinserts can only be observed if deleted
+      // - UNDO mutations are in decreasing order
+      while (u != nullptr) {
+        if (u->changelist().is_delete()) {
+          CHECK(!is_deleted);
+          is_deleted = true;
+        } else if (u->changelist().is_reinsert()) {
+          CHECK(is_deleted);
+          is_deleted = false;
+        }
+        if (!u->next()) break;
+        CHECK_GE(u->timestamp(), u->next()->timestamp());
+        u = u->next();
+      }
+#endif // NDEBUG
 
       n++;
       if (n == block.nrows()) {
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index c194fdf..7ffd7e3 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -183,8 +183,8 @@
   // The current undo head for this row, may be null if all undos were garbage collected.
   Mutation* undo_head;
 
-  // When the same row is found in multiple rowsets because of ghost rows, this points
-  // to one that older in terms of row history.
+  // When the same row is found in multiple rowsets because of ghost rows, this
+  // points to one that is older in terms of row history.
   CompactionInputRow* previous_ghost;
 
   CompactionInputRow() :
diff --git a/src/kudu/tablet/mutation.h b/src/kudu/tablet/mutation.h
index d8c076f..bd9fe76 100644
--- a/src/kudu/tablet/mutation.h
+++ b/src/kudu/tablet/mutation.h
@@ -70,6 +70,10 @@
     return reinterpret_cast<const Mutation*>(base::subtle::Acquire_Load(
         reinterpret_cast<const AtomicWord*>(&next_)));
   }
+  Mutation* acquire_next() {
+    return reinterpret_cast<Mutation*>(base::subtle::Acquire_Load(
+        reinterpret_cast<AtomicWord*>(&next_)));
+  }
 
   void set_next(Mutation *next) {
     next_ = next;