KUDU-3191: fail replicas when KUDU-2233 is detected

Despite the longstanding fixes that stop bad KUDU-2233 compactions,
users still see the results of already corrupted data, particularly when
upgrading to newer versions that may compact more aggressively than
older versions.

Rather than crashing when hitting a KUDU-2233 failure, this patch
updates the behavior to fail the replica. Similar to disk failures or
CFile checksum corruption, this will trigger re-replication to happen,
and eviction will only happen if there is a healthy majority.

The hope is that fewer users will see this corruption cause problems, as
the corruption will henceforth not crash servers, and only tablets with
a majority corrupted will be unavailable.

Change-Id: I43570b961dfd5eb8518328121585255d32cf2ebb
Reviewed-on: http://gerrit.cloudera.org:8080/16471
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
diff --git a/src/kudu/fs/error_manager.h b/src/kudu/fs/error_manager.h
index b045223..8a32d57 100644
--- a/src/kudu/fs/error_manager.h
+++ b/src/kudu/fs/error_manager.h
@@ -115,6 +115,9 @@
 
   // For CFile corruptions.
   CFILE_CORRUPTION,
+
+  // For broken invariants caused by KUDU-2233.
+  KUDU_2233_CORRUPTION,
 };
 
 // When certain operations fail, the side effects of the error can span multiple
diff --git a/src/kudu/integration-tests/disk_failure-itest.cc b/src/kudu/integration-tests/disk_failure-itest.cc
index e749103..0feaca4 100644
--- a/src/kudu/integration-tests/disk_failure-itest.cc
+++ b/src/kudu/integration-tests/disk_failure-itest.cc
@@ -30,6 +30,8 @@
 #include "kudu/client/client.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/client/write_op.h"
+#include "kudu/common/partial_row.h"
 #include "kudu/common/wire_protocol-test-util.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -38,6 +40,7 @@
 #include "kudu/integration-tests/external_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/tablet/tablet.pb.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -46,15 +49,24 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+METRIC_DECLARE_entity(tablet);
+METRIC_DECLARE_gauge_size(num_rowsets_on_disk);
 METRIC_DECLARE_gauge_uint64(data_dirs_failed);
 METRIC_DECLARE_gauge_uint32(tablets_num_failed);
 
+using kudu::client::sp::shared_ptr;
+using kudu::client::KuduDelete;
+using kudu::client::KuduInsert;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
 using kudu::cluster::ExternalDaemon;
 using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::cluster::ExternalTabletServer;
 using kudu::fs::BlockManager;
+using kudu::KuduPartialRow;
 using std::pair;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
@@ -158,6 +170,7 @@
 enum class ErrorType {
   CFILE_CORRUPTION,
   DISK_FAILURE,
+  KUDU_2233_CORRUPTION,
 };
 
 class DiskErrorITestBase : public ExternalMiniClusterITestBase,
@@ -190,6 +203,9 @@
       case ErrorType::CFILE_CORRUPTION:
         injection_flags.emplace_back("cfile_inject_corruption", "1.0");
         break;
+      case ErrorType::KUDU_2233_CORRUPTION:
+        injection_flags.emplace_back("tablet_inject_kudu_2233", "1.0");
+        break;
     }
     return injection_flags;
   }
@@ -252,6 +268,7 @@
         // First, stop injecting errors.
         { "env_inject_eio", "0.0" },
         { "cfile_inject_corruption", "0.0" },
+        { "tablet_inject_kudu_2233", "0.0" },
 
         // Then allow for recovery.
         { "enable_tablet_copy", "true" },
@@ -319,6 +336,94 @@
   NO_FATALS(v.CheckCluster());
 }
 
+// Test targeting KUDU-2233, though reused for additional coverage of CFile
+// checksum failures and disk errors.
+class CompactionsAndDeletionsFailureITest : public TabletServerDiskErrorITest {
+ public:
+  void SetUp() override {
+    ExternalMiniClusterOptions opts;
+    opts.num_tablet_servers = 3;
+    opts.num_data_dirs = kNumDataDirs;
+    opts.extra_tserver_flags = {
+      // Flush frequently so we actually get some data blocks.
+      "--flush_threshold_secs=1",
+      "--flush_threshold_mb=1",
+      // Prevent compactions so we can explicitly enable them later.
+      "--enable_rowset_compaction=false",
+      // Prevent tablet copies so we can explicitly enable and monitor them.
+      "--enable_tablet_copy=false",
+    };
+    NO_FATALS(StartClusterWithOpts(std::move(opts)));
+    workload_.reset(new TestWorkload(cluster_.get()));
+    workload_->set_num_tablets(1);
+    workload_->Setup();
+  }
+ protected:
+  unique_ptr<TestWorkload> workload_;
+};
+
+TEST_P(CompactionsAndDeletionsFailureITest, TestRecovery) {
+  constexpr const int kKeyMax = 10;
+
+  // Insert and delete rows, and continue to insert until we get multiple
+  // rowsets.
+  shared_ptr<KuduSession> session = client_->NewSession();
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(workload_->table_name(), &table));
+  const auto insert_row = [&] (int key) {
+    std::unique_ptr<KuduInsert> insert(table->NewInsert());
+    KuduPartialRow* row = insert->mutable_row();
+    RETURN_NOT_OK(row->SetInt32("key", key));
+    RETURN_NOT_OK(row->SetInt32("int_val", key));
+    RETURN_NOT_OK(session->Apply(insert.release()));
+    return session->Flush();
+  };
+  const auto delete_row = [&] (int key) {
+    std::unique_ptr<KuduDelete> del(table->NewDelete());
+    KuduPartialRow* row = del->mutable_row();
+    RETURN_NOT_OK(row->SetInt32("key", key));
+    RETURN_NOT_OK(session->Apply(del.release()));
+    return session->Flush();
+  };
+  const auto get_num_diskrowsets = [&] (const ExternalTabletServer* ets,
+                                        const char* tablet_id) {
+    int64_t num_drss = 0;
+    CHECK_OK(itest::GetInt64Metric(ets->bound_http_hostport(),
+        &METRIC_ENTITY_tablet, tablet_id, &METRIC_num_rowsets_on_disk, "value", &num_drss));
+    return num_drss;
+  };
+  ExternalTabletServer* error_ts = cluster_->tablet_server(0);
+  itest::TServerDetails* ts = ts_map_[error_ts->uuid()];
+  vector<tserver::ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 1, MonoDelta::FromSeconds(32), &tablets));
+  ASSERT_EQ(1, tablets.size());
+  const string& tablet_id = tablets[0].tablet_status().tablet_id();
+  int num_inserts = 0;
+  while (get_num_diskrowsets(error_ts, tablet_id.c_str()) < 2) {
+    auto key = num_inserts++ % kKeyMax;
+    ASSERT_OK(insert_row(key));
+    ASSERT_OK(delete_row(key));
+  }
+  // Enable compactions, which will trigger KUDU-2233 codepaths.
+  auto flag_list = InjectionFlags(GetParam(), error_ts);
+  flag_list.emplace_back("enable_rowset_compaction", "true");
+  ASSERT_OK(SetFlags(error_ts, flag_list));
+
+  // The tablet replica should fail and be re-replicated back up to full
+  // health once allowed.
+  NO_FATALS(WaitForFailedTablets(error_ts, 1));
+  ASSERT_OK(AllowRecovery());
+
+  // Wait for the cluster to return to a healthy state.
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+}
+
+INSTANTIATE_TEST_CASE_P(ErrorType, CompactionsAndDeletionsFailureITest,
+                        ::testing::Values(ErrorType::CFILE_CORRUPTION, ErrorType::DISK_FAILURE,
+                                          ErrorType::KUDU_2233_CORRUPTION));
+
 class MasterDiskErrorITest : public DiskErrorITestBase {
 };
 
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index f86776b..6efe084 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -108,6 +108,49 @@
 namespace kudu {
 namespace master {
 
+namespace {
+constexpr const char* kReplaceMasterMessage =
+    "this master may return incorrect results and should be replaced";
+void CrashMasterOnDiskError(const string& uuid) {
+  LOG(FATAL) << Substitute("Disk error detected on data directory $0: $1",
+                           uuid, kReplaceMasterMessage);
+}
+void CrashMasterOnCFileCorruption(const string& tablet_id) {
+  LOG(FATAL) << Substitute("CFile corruption detected on system catalog $0: $1",
+                           tablet_id, kReplaceMasterMessage);
+}
+void CrashMasterOnKudu2233Corruption(const string& tablet_id) {
+  LOG(FATAL) << Substitute("KUDU-2233 corruption detected on system catalog $0: $1 ",
+                           tablet_id, kReplaceMasterMessage);
+}
+
+// TODO(Alex Feinberg) this method should be moved to a separate class (along with
+// ListMasters), so that it can also be used in TS and client when
+// bootstrapping.
+Status GetMasterEntryForHost(const shared_ptr<rpc::Messenger>& messenger,
+                             const HostPort& hostport,
+                             ServerEntryPB* e) {
+  Sockaddr sockaddr;
+  RETURN_NOT_OK(SockaddrFromHostPort(hostport, &sockaddr));
+  MasterServiceProxy proxy(messenger, sockaddr, hostport.host());
+  GetMasterRegistrationRequestPB req;
+  GetMasterRegistrationResponsePB resp;
+  rpc::RpcController controller;
+  controller.set_timeout(MonoDelta::FromMilliseconds(FLAGS_master_registration_rpc_timeout_ms));
+  RETURN_NOT_OK(proxy.GetMasterRegistration(req, &resp, &controller));
+  e->mutable_instance_id()->CopyFrom(resp.instance_id());
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  e->mutable_registration()->CopyFrom(resp.registration());
+  e->set_role(resp.role());
+  if (resp.has_cluster_id()) {
+    e->set_cluster_id(resp.cluster_id());
+  }
+  return Status::OK();
+}
+} // anonymous namespace
+
 Master::Master(const MasterOptions& opts)
   : KuduServer("Master", opts, "kudu.master"),
     state_(kStopped),
@@ -170,6 +213,8 @@
                                       &CrashMasterOnDiskError);
   fs_manager_->SetErrorNotificationCb(ErrorHandlerType::CFILE_CORRUPTION,
                                       &CrashMasterOnCFileCorruption);
+  fs_manager_->SetErrorNotificationCb(ErrorHandlerType::KUDU_2233_CORRUPTION,
+                                      &CrashMasterOnKudu2233Corruption);
 
   RETURN_NOT_OK(maintenance_manager_->Start());
 
@@ -290,44 +335,6 @@
   state_ = kStopped;
 }
 
-void Master::CrashMasterOnDiskError(const string& uuid) {
-  LOG(FATAL) << Substitute("Disk error detected on data directory $0", uuid);
-}
-
-void Master::CrashMasterOnCFileCorruption(const string& tablet_id) {
-  LOG(FATAL) << Substitute("CFile corruption detected on system catalog $0", tablet_id);
-}
-
-namespace {
-
-// TODO(Alex Feinberg) this method should be moved to a separate class (along with
-// ListMasters), so that it can also be used in TS and client when
-// bootstrapping.
-Status GetMasterEntryForHost(const shared_ptr<rpc::Messenger>& messenger,
-                             const HostPort& hostport,
-                             ServerEntryPB* e) {
-  Sockaddr sockaddr;
-  RETURN_NOT_OK(SockaddrFromHostPort(hostport, &sockaddr));
-  MasterServiceProxy proxy(messenger, sockaddr, hostport.host());
-  GetMasterRegistrationRequestPB req;
-  GetMasterRegistrationResponsePB resp;
-  rpc::RpcController controller;
-  controller.set_timeout(MonoDelta::FromMilliseconds(FLAGS_master_registration_rpc_timeout_ms));
-  RETURN_NOT_OK(proxy.GetMasterRegistration(req, &resp, &controller));
-  e->mutable_instance_id()->CopyFrom(resp.instance_id());
-  if (resp.has_error()) {
-    return StatusFromPB(resp.error().status());
-  }
-  e->mutable_registration()->CopyFrom(resp.registration());
-  e->set_role(resp.role());
-  if (resp.has_cluster_id()) {
-    e->set_cluster_id(resp.cluster_id());
-  }
-  return Status::OK();
-}
-
-} // anonymous namespace
-
 Status Master::ListMasters(vector<ServerEntryPB>* masters) const {
   if (!opts_.IsDistributed()) {
     ServerEntryPB local_entry;
diff --git a/src/kudu/master/master.h b/src/kudu/master/master.h
index 8b9d4e8..6a59435 100644
--- a/src/kudu/master/master.h
+++ b/src/kudu/master/master.h
@@ -109,12 +109,6 @@
   // based on local state.
   Status GetMasterHostPorts(std::vector<HostPort>* hostports) const;
 
-  // Crash the master on disk error.
-  static void CrashMasterOnDiskError(const std::string& uuid);
-
-  // Crash the master on CFile corruption.
-  static void CrashMasterOnCFileCorruption(const std::string& tablet_id);
-
   bool IsShutdown() const {
     return state_ == kStopped;
   }
diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index 9cc0ef0..58d84f5 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -291,7 +291,9 @@
                                 Tablet::DefaultBloomSizing(),
                                 roll_threshold);
     ASSERT_OK(rsw.Open());
-    ASSERT_OK(FlushCompactionInput(input, snap, HistoryGcOpts::Disabled(), &rsw));
+    ASSERT_OK(FlushCompactionInput(tablet()->metadata()->tablet_id(),
+                                   fs_manager()->block_manager()->error_manager(),
+                                   input, snap, HistoryGcOpts::Disabled(), &rsw));
     ASSERT_OK(rsw.Finish());
 
     vector<shared_ptr<RowSetMetadata> > metas;
@@ -467,7 +469,9 @@
                                     Tablet::DefaultBloomSizing(),
                                     1024 * 1024); // 1 MB
       ASSERT_OK(rdrsw.Open());
-      ASSERT_OK(FlushCompactionInput(compact_input.get(), merge_snap, HistoryGcOpts::Disabled(),
+      ASSERT_OK(FlushCompactionInput(tablet()->metadata()->tablet_id(),
+                                     fs_manager()->block_manager()->error_manager(),
+                                     compact_input.get(), merge_snap, HistoryGcOpts::Disabled(),
                                      &rdrsw));
       ASSERT_OK(rdrsw.Finish());
     }
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 42ef966..8b116ce 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -26,6 +26,7 @@
 #include <unordered_set>
 #include <vector>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
 #include "kudu/clock/hybrid_clock.h"
@@ -39,6 +40,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/opid_util.h"
+#include "kudu/fs/error_manager.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
@@ -55,10 +57,15 @@
 #include "kudu/tablet/tablet.pb.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/memory/arena.h"
 
 using kudu::clock::HybridClock;
+using kudu::fault_injection::MaybeTrue;
 using kudu::fs::IOContext;
+using kudu::fs::FsErrorManager;
+using kudu::fs::KUDU_2233_CORRUPTION;
 using std::deque;
 using std::shared_ptr;
 using std::string;
@@ -67,6 +74,12 @@
 using std::vector;
 using strings::Substitute;
 
+DEFINE_double(tablet_inject_kudu_2233, 0,
+              "Fraction of the time that compactions that merge the history "
+              "of a single row spread across multiple rowsets will return "
+              "with a corruption status");
+TAG_FLAG(tablet_inject_kudu_2233, hidden);
+
 namespace kudu {
 namespace tablet {
 
@@ -755,7 +768,9 @@
 
 // If 'old_row' has previous versions, this transforms prior version in undos and adds them
 // to 'new_undo_head'.
-Status MergeDuplicatedRowHistory(CompactionInputRow* old_row,
+Status MergeDuplicatedRowHistory(const string& tablet_id,
+                                 const FsErrorManager* error_manager,
+                                 CompactionInputRow* old_row,
                                  Mutation** new_undo_head,
                                  Arena* arena) {
   if (PREDICT_TRUE(old_row->previous_ghost == nullptr)) return Status::OK();
@@ -786,9 +801,17 @@
                                                  &previous_ghost->row));
 
     // We should be left with only one redo, the delete.
-    CHECK(pv_delete_redo != nullptr);
-    CHECK(pv_delete_redo->changelist().is_delete());
-    CHECK(pv_delete_redo->next() == nullptr);
+    DCHECK(pv_delete_redo != nullptr);
+    DCHECK(pv_delete_redo->changelist().is_delete());
+    DCHECK(pv_delete_redo->next() == nullptr);
+    if (PREDICT_FALSE(
+        pv_delete_redo == nullptr ||
+        !pv_delete_redo->changelist().is_delete() ||
+        pv_delete_redo->next() ||
+        MaybeTrue(FLAGS_tablet_inject_kudu_2233))) {
+      error_manager->RunErrorNotificationCb(KUDU_2233_CORRUPTION, tablet_id);
+      return Status::Corruption("data was corrupted in a version prior to Kudu 1.7.0");
+    }
 
     // Now transform the redo delete into an undo (reinsert), which will contain the previous
     // ghost. The reinsert will have the timestamp of the delete.
@@ -1094,7 +1117,9 @@
   #undef ERROR_LOG_CONTEXT
 }
 
-Status FlushCompactionInput(CompactionInput* input,
+Status FlushCompactionInput(const string& tablet_id,
+                            const FsErrorManager* error_manager,
+                            CompactionInput* input,
                             const MvccSnapshot& snap,
                             const HistoryGcOpts& history_gc_opts,
                             RollingDiskRowSetWriter* out) {
@@ -1135,7 +1160,9 @@
                                                    &dst_row));
 
       // Merge the histories of 'input_row' with previous ghosts, if there are any.
-      RETURN_NOT_OK(MergeDuplicatedRowHistory(input_row,
+      RETURN_NOT_OK(MergeDuplicatedRowHistory(tablet_id,
+                                              error_manager,
+                                              input_row,
                                               &new_undos_head,
                                               input->PreparedBlockArena()));
 
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index d7060c3..c194fdf 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -36,6 +36,7 @@
 class Schema;
 
 namespace fs {
+class FsErrorManager;
 struct IOContext;
 }  // namespace fs
 
@@ -226,7 +227,9 @@
 //
 // After return of this function, this CompactionInput object is "used up" and will
 // no longer be useful.
-Status FlushCompactionInput(CompactionInput *input,
+Status FlushCompactionInput(const std::string& tablet_id,
+                            const fs::FsErrorManager* error_manager,
+                            CompactionInput* input,
                             const MvccSnapshot &snap,
                             const HistoryGcOpts& history_gc_opts,
                             RollingDiskRowSetWriter *out);
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index b633301..811605e 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -50,6 +50,7 @@
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/opid.pb.h"
+#include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/fs/io_context.h"
 #include "kudu/gutil/casts.h"
@@ -1583,7 +1584,8 @@
                "tablet_id", tablet_id(),
                "op", op_name);
 
-  const IOContext io_context({ tablet_id() });
+  const auto& tid = tablet_id();
+  const IOContext io_context({ tid });
 
   MvccSnapshot flush_snap(mvcc_);
   VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 1 (flushing snapshot). "
@@ -1603,8 +1605,11 @@
   RETURN_NOT_OK_PREPEND(drsw.Open(), "Failed to open DiskRowSet for flush");
 
   HistoryGcOpts history_gc_opts = GetHistoryGcOpts();
-  RETURN_NOT_OK_PREPEND(FlushCompactionInput(merge.get(), flush_snap, history_gc_opts, &drsw),
-                        "Flush to disk failed");
+  RETURN_NOT_OK_PREPEND(
+      FlushCompactionInput(
+          tid, metadata_->fs_manager()->block_manager()->error_manager(),
+          merge.get(), flush_snap, history_gc_opts, &drsw),
+      "Flush to disk failed");
   RETURN_NOT_OK_PREPEND(drsw.Finish(), "Failed to finish DRS writer");
 
   if (common_hooks_) {
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 3244ed2..74ccd03 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -177,6 +177,7 @@
 DECLARE_double(cfile_inject_corruption);
 DECLARE_double(env_inject_eio);
 DECLARE_double(env_inject_full);
+DECLARE_double(tablet_inject_kudu_2233);
 DECLARE_double(workload_score_upper_bound);
 DECLARE_int32(flush_threshold_mb);
 DECLARE_int32(flush_threshold_secs);
@@ -753,7 +754,8 @@
 
 enum class ErrorType {
   DISK_FAILURE,
-  CFILE_CORRUPTION
+  CFILE_CORRUPTION,
+  KUDU_2233_CORRUPTION,
 };
 
 class TabletServerDiskErrorTest : public TabletServerTestBase,
@@ -775,32 +777,38 @@
 };
 
 INSTANTIATE_TEST_CASE_P(ErrorType, TabletServerDiskErrorTest, ::testing::Values(
-    ErrorType::DISK_FAILURE, ErrorType::CFILE_CORRUPTION));
+    ErrorType::DISK_FAILURE, ErrorType::CFILE_CORRUPTION, ErrorType::KUDU_2233_CORRUPTION));
 
 // Test that applies random write operations to a tablet with a high
 // maintenance manager load and a non-zero error injection rate.
 TEST_P(TabletServerDiskErrorTest, TestRandomOpSequence) {
-  if (!AllowSlowTests()) {
-    LOG(INFO) << "Not running slow test. To run, use KUDU_ALLOW_SLOW_TESTS=1";
-    return;
-  }
+  SKIP_IF_SLOW_NOT_ALLOWED();
   typedef vector<RowOperationsPB::Type> OpTypeList;
   const OpTypeList kOpsIfKeyNotPresent = { RowOperationsPB::INSERT, RowOperationsPB::UPSERT };
-  const OpTypeList kOpsIfKeyPresent = { RowOperationsPB::UPSERT, RowOperationsPB::UPDATE,
-                                        RowOperationsPB::DELETE };
-  const int kMaxKey = 100000;
 
+  // If testing KUDU-2233 corruption, reduce the key-space significantly to
+  // make it more likely that rows will have history to merge, and delete more
+  // frequently to ensure a row's history is strewn across multiple rowsets.
+  const bool is_kudu_2233 = GetParam() == ErrorType::KUDU_2233_CORRUPTION;
+  const OpTypeList kOpsIfKeyPresent = is_kudu_2233 ?
+      OpTypeList{ RowOperationsPB::DELETE } :
+      OpTypeList{ RowOperationsPB::UPSERT, RowOperationsPB::UPDATE, RowOperationsPB::DELETE };
+  const int kMaxKey = is_kudu_2233 ? 10 : 100000;
+  if (is_kudu_2233) {
+    // Failures from KUDU-2233 are only caught during compactions; disable them
+    // for now so we can start them only after we begin injecting errors.
+    FLAGS_enable_rowset_compaction = false;
+ }
   if (GetParam() == ErrorType::DISK_FAILURE) {
     // Set these way up-front so we can change a single value to actually start
     // injecting errors. Inject errors into all data dirs but one.
-    FLAGS_crash_on_eio = false;
     const vector<string> failed_dirs = { mini_server_->options()->fs_opts.data_roots.begin() + 1,
                                          mini_server_->options()->fs_opts.data_roots.end() };
     FLAGS_env_inject_eio_globs = JoinStrings(JoinPathSegmentsV(failed_dirs, "**"), ",");
   }
 
   set<int> keys;
-  const auto GetRandomString = [] {
+  const auto GetRandomString = [&] {
     return StringPrintf("%d", rand() % kMaxKey);
   };
 
@@ -816,13 +824,12 @@
     RpcController controller;
     RowOperationsPB::Type op_type;
     int key = rand() % kMaxKey;
-    auto key_iter = keys.find(key);
-    if (key_iter == keys.end()) {
-      // If the key already exists, insert or upsert.
-      op_type = kOpsIfKeyNotPresent[rand() % kOpsIfKeyNotPresent.size()];
-    } else {
-      // ... else we can do anything but insert.
+    if (ContainsKey(keys, key)) {
+      // If the key already exists, update, upsert, or delete it.
       op_type = kOpsIfKeyPresent[rand() % kOpsIfKeyPresent.size()];
+    } else {
+      // ... otherwise, insert or upsert.
+      op_type = kOpsIfKeyNotPresent[rand() % kOpsIfKeyNotPresent.size()];
     }
 
     // Add the op to the request.
@@ -832,7 +839,7 @@
       keys.insert(key);
     } else {
       AddTestKeyToPB(RowOperationsPB::DELETE, schema_, key, req.mutable_row_operations());
-      keys.erase(key_iter);
+      keys.erase(key);
     }
 
     // Finally, write to the server and log the response.
@@ -841,7 +848,8 @@
     return resp.has_error() ?  StatusFromPB(resp.error().status()) : Status::OK();
   };
 
-  // Perform some arbitrarily large number of ops, with some pauses to encourage flushes.
+  // Perform some arbitrarily large number of ops, with some pauses to
+  // encourage flushes.
   for (int i = 0; i < 500; i++) {
     if (i % 10) {
       SleepFor(MonoDelta::FromMilliseconds(100));
@@ -857,6 +865,12 @@
     case ErrorType::CFILE_CORRUPTION:
       FLAGS_cfile_inject_corruption = 0.01;
       break;
+    case ErrorType::KUDU_2233_CORRUPTION:
+      // KUDU-2233 errors only get triggered with very specific compactions, so
+      // bump the failure rate all the way up.
+      FLAGS_tablet_inject_kudu_2233 = 1.0;
+      FLAGS_enable_rowset_compaction = true;
+      break;
   }
 
   // The tablet will eventually be failed and will not be able to accept
@@ -937,6 +951,23 @@
   });
 }
 
+TEST_F(TabletServerTest, TestFailReplicaOnKUDU2233Corruption) {
+  FLAGS_tablet_inject_kudu_2233 = 1;
+  // Trigger the code paths that crash a server in the case of KUDU-2233
+  // compactions, i.e. where a compaction needs to merge the history of a
+  // single row across multiple rowsets.
+  NO_FATALS(InsertTestRowsRemote(1, 1));
+  NO_FATALS(DeleteTestRowsRemote(1, 1));
+  ASSERT_OK(tablet_replica_->tablet()->Flush());
+  NO_FATALS(InsertTestRowsRemote(1, 1));
+  ASSERT_OK(tablet_replica_->tablet()->Flush());
+  Status s = tablet_replica_->tablet()->Compact(Tablet::FORCE_COMPACT_ALL);
+  ASSERT_TRUE(s.IsCorruption());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(tablet::FAILED, tablet_replica_->state());
+  });
+}
+
 class TabletServerMaintenanceMemoryPressureTest : public TabletServerTestBase {
  public:
   void SetUp() override {
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 3bdf11e..c8b87f3 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -129,6 +129,10 @@
       ErrorHandlerType::CFILE_CORRUPTION, [this](const string& uuid) {
         this->tablet_manager_->FailTabletAndScheduleShutdown(uuid);
       });
+  fs_manager_->SetErrorNotificationCb(
+      ErrorHandlerType::KUDU_2233_CORRUPTION, [this](const string& uuid) {
+        this->tablet_manager_->FailTabletAndScheduleShutdown(uuid);
+      });
 
   unique_ptr<ServiceIf> ts_service(new TabletServiceImpl(this));
   unique_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));