KUDU-3191: fail replicas when KUDU-2233 is detected
Despite the longstanding fixes that stop bad KUDU-2233 compactions,
users still see the results of already corrupted data, particularly when
upgrading to newer versions that may compact more aggressively than
older versions.
Rather than crashing when hitting a KUDU-2233 failure, this patch
updates the behavior to fail the replica. Similar to disk failures or
CFile checksum corruption, this will trigger re-replication to happen,
and eviction will only happen if there is a healthy majority.
The hope is that fewer users will see this corruption cause problems, as
the corruption will henceforth not crash servers, and only tablets with
a majority corrupted will be unavailable.
Change-Id: I43570b961dfd5eb8518328121585255d32cf2ebb
Reviewed-on: http://gerrit.cloudera.org:8080/16471
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
diff --git a/src/kudu/fs/error_manager.h b/src/kudu/fs/error_manager.h
index b045223..8a32d57 100644
--- a/src/kudu/fs/error_manager.h
+++ b/src/kudu/fs/error_manager.h
@@ -115,6 +115,9 @@
// For CFile corruptions.
CFILE_CORRUPTION,
+
+ // For broken invariants caused by KUDU-2233.
+ KUDU_2233_CORRUPTION,
};
// When certain operations fail, the side effects of the error can span multiple
diff --git a/src/kudu/integration-tests/disk_failure-itest.cc b/src/kudu/integration-tests/disk_failure-itest.cc
index e749103..0feaca4 100644
--- a/src/kudu/integration-tests/disk_failure-itest.cc
+++ b/src/kudu/integration-tests/disk_failure-itest.cc
@@ -30,6 +30,8 @@
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/client/write_op.h"
+#include "kudu/common/partial_row.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/fs/block_manager.h"
#include "kudu/gutil/strings/substitute.h"
@@ -38,6 +40,7 @@
#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/tablet/tablet.pb.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
@@ -46,15 +49,24 @@
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+METRIC_DECLARE_entity(tablet);
+METRIC_DECLARE_gauge_size(num_rowsets_on_disk);
METRIC_DECLARE_gauge_uint64(data_dirs_failed);
METRIC_DECLARE_gauge_uint32(tablets_num_failed);
+using kudu::client::sp::shared_ptr;
+using kudu::client::KuduDelete;
+using kudu::client::KuduInsert;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
using kudu::cluster::ExternalDaemon;
using kudu::cluster::ExternalMiniClusterOptions;
using kudu::cluster::ExternalTabletServer;
using kudu::fs::BlockManager;
+using kudu::KuduPartialRow;
using std::pair;
using std::string;
+using std::unique_ptr;
using std::vector;
using strings::Substitute;
@@ -158,6 +170,7 @@
enum class ErrorType {
CFILE_CORRUPTION,
DISK_FAILURE,
+ KUDU_2233_CORRUPTION,
};
class DiskErrorITestBase : public ExternalMiniClusterITestBase,
@@ -190,6 +203,9 @@
case ErrorType::CFILE_CORRUPTION:
injection_flags.emplace_back("cfile_inject_corruption", "1.0");
break;
+ case ErrorType::KUDU_2233_CORRUPTION:
+ injection_flags.emplace_back("tablet_inject_kudu_2233", "1.0");
+ break;
}
return injection_flags;
}
@@ -252,6 +268,7 @@
// First, stop injecting errors.
{ "env_inject_eio", "0.0" },
{ "cfile_inject_corruption", "0.0" },
+ { "tablet_inject_kudu_2233", "0.0" },
// Then allow for recovery.
{ "enable_tablet_copy", "true" },
@@ -319,6 +336,94 @@
NO_FATALS(v.CheckCluster());
}
+// Test targeting KUDU-2233, though reused for additional coverage of CFile
+// checksum failures and disk errors.
+class CompactionsAndDeletionsFailureITest : public TabletServerDiskErrorITest {
+ public:
+ void SetUp() override {
+ ExternalMiniClusterOptions opts;
+ opts.num_tablet_servers = 3;
+ opts.num_data_dirs = kNumDataDirs;
+ opts.extra_tserver_flags = {
+ // Flush frequently so we actually get some data blocks.
+ "--flush_threshold_secs=1",
+ "--flush_threshold_mb=1",
+ // Prevent compactions so we can explicitly enable them later.
+ "--enable_rowset_compaction=false",
+ // Prevent tablet copies so we can explicitly enable and monitor them.
+ "--enable_tablet_copy=false",
+ };
+ NO_FATALS(StartClusterWithOpts(std::move(opts)));
+ workload_.reset(new TestWorkload(cluster_.get()));
+ workload_->set_num_tablets(1);
+ workload_->Setup();
+ }
+ protected:
+ unique_ptr<TestWorkload> workload_;
+};
+
+TEST_P(CompactionsAndDeletionsFailureITest, TestRecovery) {
+ constexpr const int kKeyMax = 10;
+
+ // Insert and delete rows, and continue to insert until we get multiple
+ // rowsets.
+ shared_ptr<KuduSession> session = client_->NewSession();
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(workload_->table_name(), &table));
+ const auto insert_row = [&] (int key) {
+ std::unique_ptr<KuduInsert> insert(table->NewInsert());
+ KuduPartialRow* row = insert->mutable_row();
+ RETURN_NOT_OK(row->SetInt32("key", key));
+ RETURN_NOT_OK(row->SetInt32("int_val", key));
+ RETURN_NOT_OK(session->Apply(insert.release()));
+ return session->Flush();
+ };
+ const auto delete_row = [&] (int key) {
+ std::unique_ptr<KuduDelete> del(table->NewDelete());
+ KuduPartialRow* row = del->mutable_row();
+ RETURN_NOT_OK(row->SetInt32("key", key));
+ RETURN_NOT_OK(session->Apply(del.release()));
+ return session->Flush();
+ };
+ const auto get_num_diskrowsets = [&] (const ExternalTabletServer* ets,
+ const char* tablet_id) {
+ int64_t num_drss = 0;
+ CHECK_OK(itest::GetInt64Metric(ets->bound_http_hostport(),
+ &METRIC_ENTITY_tablet, tablet_id, &METRIC_num_rowsets_on_disk, "value", &num_drss));
+ return num_drss;
+ };
+ ExternalTabletServer* error_ts = cluster_->tablet_server(0);
+ itest::TServerDetails* ts = ts_map_[error_ts->uuid()];
+ vector<tserver::ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+ ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 1, MonoDelta::FromSeconds(32), &tablets));
+ ASSERT_EQ(1, tablets.size());
+ const string& tablet_id = tablets[0].tablet_status().tablet_id();
+ int num_inserts = 0;
+ while (get_num_diskrowsets(error_ts, tablet_id.c_str()) < 2) {
+ auto key = num_inserts++ % kKeyMax;
+ ASSERT_OK(insert_row(key));
+ ASSERT_OK(delete_row(key));
+ }
+ // Enable compactions, which will trigger KUDU-2233 codepaths.
+ auto flag_list = InjectionFlags(GetParam(), error_ts);
+ flag_list.emplace_back("enable_rowset_compaction", "true");
+ ASSERT_OK(SetFlags(error_ts, flag_list));
+
+ // The tablet replica should fail and be re-replicated back up to full
+ // health once allowed.
+ NO_FATALS(WaitForFailedTablets(error_ts, 1));
+ ASSERT_OK(AllowRecovery());
+
+ // Wait for the cluster to return to a healthy state.
+ ClusterVerifier v(cluster_.get());
+ NO_FATALS(v.CheckCluster());
+}
+
+INSTANTIATE_TEST_CASE_P(ErrorType, CompactionsAndDeletionsFailureITest,
+ ::testing::Values(ErrorType::CFILE_CORRUPTION, ErrorType::DISK_FAILURE,
+ ErrorType::KUDU_2233_CORRUPTION));
+
class MasterDiskErrorITest : public DiskErrorITestBase {
};
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index f86776b..6efe084 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -108,6 +108,49 @@
namespace kudu {
namespace master {
+namespace {
+constexpr const char* kReplaceMasterMessage =
+ "this master may return incorrect results and should be replaced";
+void CrashMasterOnDiskError(const string& uuid) {
+ LOG(FATAL) << Substitute("Disk error detected on data directory $0: $1",
+ uuid, kReplaceMasterMessage);
+}
+void CrashMasterOnCFileCorruption(const string& tablet_id) {
+ LOG(FATAL) << Substitute("CFile corruption detected on system catalog $0: $1",
+ tablet_id, kReplaceMasterMessage);
+}
+void CrashMasterOnKudu2233Corruption(const string& tablet_id) {
+ LOG(FATAL) << Substitute("KUDU-2233 corruption detected on system catalog $0: $1 ",
+ tablet_id, kReplaceMasterMessage);
+}
+
+// TODO(Alex Feinberg) this method should be moved to a separate class (along with
+// ListMasters), so that it can also be used in TS and client when
+// bootstrapping.
+Status GetMasterEntryForHost(const shared_ptr<rpc::Messenger>& messenger,
+ const HostPort& hostport,
+ ServerEntryPB* e) {
+ Sockaddr sockaddr;
+ RETURN_NOT_OK(SockaddrFromHostPort(hostport, &sockaddr));
+ MasterServiceProxy proxy(messenger, sockaddr, hostport.host());
+ GetMasterRegistrationRequestPB req;
+ GetMasterRegistrationResponsePB resp;
+ rpc::RpcController controller;
+ controller.set_timeout(MonoDelta::FromMilliseconds(FLAGS_master_registration_rpc_timeout_ms));
+ RETURN_NOT_OK(proxy.GetMasterRegistration(req, &resp, &controller));
+ e->mutable_instance_id()->CopyFrom(resp.instance_id());
+ if (resp.has_error()) {
+ return StatusFromPB(resp.error().status());
+ }
+ e->mutable_registration()->CopyFrom(resp.registration());
+ e->set_role(resp.role());
+ if (resp.has_cluster_id()) {
+ e->set_cluster_id(resp.cluster_id());
+ }
+ return Status::OK();
+}
+} // anonymous namespace
+
Master::Master(const MasterOptions& opts)
: KuduServer("Master", opts, "kudu.master"),
state_(kStopped),
@@ -170,6 +213,8 @@
&CrashMasterOnDiskError);
fs_manager_->SetErrorNotificationCb(ErrorHandlerType::CFILE_CORRUPTION,
&CrashMasterOnCFileCorruption);
+ fs_manager_->SetErrorNotificationCb(ErrorHandlerType::KUDU_2233_CORRUPTION,
+ &CrashMasterOnKudu2233Corruption);
RETURN_NOT_OK(maintenance_manager_->Start());
@@ -290,44 +335,6 @@
state_ = kStopped;
}
-void Master::CrashMasterOnDiskError(const string& uuid) {
- LOG(FATAL) << Substitute("Disk error detected on data directory $0", uuid);
-}
-
-void Master::CrashMasterOnCFileCorruption(const string& tablet_id) {
- LOG(FATAL) << Substitute("CFile corruption detected on system catalog $0", tablet_id);
-}
-
-namespace {
-
-// TODO(Alex Feinberg) this method should be moved to a separate class (along with
-// ListMasters), so that it can also be used in TS and client when
-// bootstrapping.
-Status GetMasterEntryForHost(const shared_ptr<rpc::Messenger>& messenger,
- const HostPort& hostport,
- ServerEntryPB* e) {
- Sockaddr sockaddr;
- RETURN_NOT_OK(SockaddrFromHostPort(hostport, &sockaddr));
- MasterServiceProxy proxy(messenger, sockaddr, hostport.host());
- GetMasterRegistrationRequestPB req;
- GetMasterRegistrationResponsePB resp;
- rpc::RpcController controller;
- controller.set_timeout(MonoDelta::FromMilliseconds(FLAGS_master_registration_rpc_timeout_ms));
- RETURN_NOT_OK(proxy.GetMasterRegistration(req, &resp, &controller));
- e->mutable_instance_id()->CopyFrom(resp.instance_id());
- if (resp.has_error()) {
- return StatusFromPB(resp.error().status());
- }
- e->mutable_registration()->CopyFrom(resp.registration());
- e->set_role(resp.role());
- if (resp.has_cluster_id()) {
- e->set_cluster_id(resp.cluster_id());
- }
- return Status::OK();
-}
-
-} // anonymous namespace
-
Status Master::ListMasters(vector<ServerEntryPB>* masters) const {
if (!opts_.IsDistributed()) {
ServerEntryPB local_entry;
diff --git a/src/kudu/master/master.h b/src/kudu/master/master.h
index 8b9d4e8..6a59435 100644
--- a/src/kudu/master/master.h
+++ b/src/kudu/master/master.h
@@ -109,12 +109,6 @@
// based on local state.
Status GetMasterHostPorts(std::vector<HostPort>* hostports) const;
- // Crash the master on disk error.
- static void CrashMasterOnDiskError(const std::string& uuid);
-
- // Crash the master on CFile corruption.
- static void CrashMasterOnCFileCorruption(const std::string& tablet_id);
-
bool IsShutdown() const {
return state_ == kStopped;
}
diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index 9cc0ef0..58d84f5 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -291,7 +291,9 @@
Tablet::DefaultBloomSizing(),
roll_threshold);
ASSERT_OK(rsw.Open());
- ASSERT_OK(FlushCompactionInput(input, snap, HistoryGcOpts::Disabled(), &rsw));
+ ASSERT_OK(FlushCompactionInput(tablet()->metadata()->tablet_id(),
+ fs_manager()->block_manager()->error_manager(),
+ input, snap, HistoryGcOpts::Disabled(), &rsw));
ASSERT_OK(rsw.Finish());
vector<shared_ptr<RowSetMetadata> > metas;
@@ -467,7 +469,9 @@
Tablet::DefaultBloomSizing(),
1024 * 1024); // 1 MB
ASSERT_OK(rdrsw.Open());
- ASSERT_OK(FlushCompactionInput(compact_input.get(), merge_snap, HistoryGcOpts::Disabled(),
+ ASSERT_OK(FlushCompactionInput(tablet()->metadata()->tablet_id(),
+ fs_manager()->block_manager()->error_manager(),
+ compact_input.get(), merge_snap, HistoryGcOpts::Disabled(),
&rdrsw));
ASSERT_OK(rdrsw.Finish());
}
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 42ef966..8b116ce 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -26,6 +26,7 @@
#include <unordered_set>
#include <vector>
+#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/clock/hybrid_clock.h"
@@ -39,6 +40,7 @@
#include "kudu/common/schema.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
+#include "kudu/fs/error_manager.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
@@ -55,10 +57,15 @@
#include "kudu/tablet/tablet.pb.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/faststring.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/flag_tags.h"
#include "kudu/util/memory/arena.h"
using kudu::clock::HybridClock;
+using kudu::fault_injection::MaybeTrue;
using kudu::fs::IOContext;
+using kudu::fs::FsErrorManager;
+using kudu::fs::KUDU_2233_CORRUPTION;
using std::deque;
using std::shared_ptr;
using std::string;
@@ -67,6 +74,12 @@
using std::vector;
using strings::Substitute;
+DEFINE_double(tablet_inject_kudu_2233, 0,
+ "Fraction of the time that compactions that merge the history "
+ "of a single row spread across multiple rowsets will return "
+ "with a corruption status");
+TAG_FLAG(tablet_inject_kudu_2233, hidden);
+
namespace kudu {
namespace tablet {
@@ -755,7 +768,9 @@
// If 'old_row' has previous versions, this transforms prior version in undos and adds them
// to 'new_undo_head'.
-Status MergeDuplicatedRowHistory(CompactionInputRow* old_row,
+Status MergeDuplicatedRowHistory(const string& tablet_id,
+ const FsErrorManager* error_manager,
+ CompactionInputRow* old_row,
Mutation** new_undo_head,
Arena* arena) {
if (PREDICT_TRUE(old_row->previous_ghost == nullptr)) return Status::OK();
@@ -786,9 +801,17 @@
&previous_ghost->row));
// We should be left with only one redo, the delete.
- CHECK(pv_delete_redo != nullptr);
- CHECK(pv_delete_redo->changelist().is_delete());
- CHECK(pv_delete_redo->next() == nullptr);
+ DCHECK(pv_delete_redo != nullptr);
+ DCHECK(pv_delete_redo->changelist().is_delete());
+ DCHECK(pv_delete_redo->next() == nullptr);
+ if (PREDICT_FALSE(
+ pv_delete_redo == nullptr ||
+ !pv_delete_redo->changelist().is_delete() ||
+ pv_delete_redo->next() ||
+ MaybeTrue(FLAGS_tablet_inject_kudu_2233))) {
+ error_manager->RunErrorNotificationCb(KUDU_2233_CORRUPTION, tablet_id);
+ return Status::Corruption("data was corrupted in a version prior to Kudu 1.7.0");
+ }
// Now transform the redo delete into an undo (reinsert), which will contain the previous
// ghost. The reinsert will have the timestamp of the delete.
@@ -1094,7 +1117,9 @@
#undef ERROR_LOG_CONTEXT
}
-Status FlushCompactionInput(CompactionInput* input,
+Status FlushCompactionInput(const string& tablet_id,
+ const FsErrorManager* error_manager,
+ CompactionInput* input,
const MvccSnapshot& snap,
const HistoryGcOpts& history_gc_opts,
RollingDiskRowSetWriter* out) {
@@ -1135,7 +1160,9 @@
&dst_row));
// Merge the histories of 'input_row' with previous ghosts, if there are any.
- RETURN_NOT_OK(MergeDuplicatedRowHistory(input_row,
+ RETURN_NOT_OK(MergeDuplicatedRowHistory(tablet_id,
+ error_manager,
+ input_row,
&new_undos_head,
input->PreparedBlockArena()));
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index d7060c3..c194fdf 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -36,6 +36,7 @@
class Schema;
namespace fs {
+class FsErrorManager;
struct IOContext;
} // namespace fs
@@ -226,7 +227,9 @@
//
// After return of this function, this CompactionInput object is "used up" and will
// no longer be useful.
-Status FlushCompactionInput(CompactionInput *input,
+Status FlushCompactionInput(const std::string& tablet_id,
+ const fs::FsErrorManager* error_manager,
+ CompactionInput* input,
const MvccSnapshot &snap,
const HistoryGcOpts& history_gc_opts,
RollingDiskRowSetWriter *out);
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index b633301..811605e 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -50,6 +50,7 @@
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
+#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/fs/io_context.h"
#include "kudu/gutil/casts.h"
@@ -1583,7 +1584,8 @@
"tablet_id", tablet_id(),
"op", op_name);
- const IOContext io_context({ tablet_id() });
+ const auto& tid = tablet_id();
+ const IOContext io_context({ tid });
MvccSnapshot flush_snap(mvcc_);
VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 1 (flushing snapshot). "
@@ -1603,8 +1605,11 @@
RETURN_NOT_OK_PREPEND(drsw.Open(), "Failed to open DiskRowSet for flush");
HistoryGcOpts history_gc_opts = GetHistoryGcOpts();
- RETURN_NOT_OK_PREPEND(FlushCompactionInput(merge.get(), flush_snap, history_gc_opts, &drsw),
- "Flush to disk failed");
+ RETURN_NOT_OK_PREPEND(
+ FlushCompactionInput(
+ tid, metadata_->fs_manager()->block_manager()->error_manager(),
+ merge.get(), flush_snap, history_gc_opts, &drsw),
+ "Flush to disk failed");
RETURN_NOT_OK_PREPEND(drsw.Finish(), "Failed to finish DRS writer");
if (common_hooks_) {
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 3244ed2..74ccd03 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -177,6 +177,7 @@
DECLARE_double(cfile_inject_corruption);
DECLARE_double(env_inject_eio);
DECLARE_double(env_inject_full);
+DECLARE_double(tablet_inject_kudu_2233);
DECLARE_double(workload_score_upper_bound);
DECLARE_int32(flush_threshold_mb);
DECLARE_int32(flush_threshold_secs);
@@ -753,7 +754,8 @@
enum class ErrorType {
DISK_FAILURE,
- CFILE_CORRUPTION
+ CFILE_CORRUPTION,
+ KUDU_2233_CORRUPTION,
};
class TabletServerDiskErrorTest : public TabletServerTestBase,
@@ -775,32 +777,38 @@
};
INSTANTIATE_TEST_CASE_P(ErrorType, TabletServerDiskErrorTest, ::testing::Values(
- ErrorType::DISK_FAILURE, ErrorType::CFILE_CORRUPTION));
+ ErrorType::DISK_FAILURE, ErrorType::CFILE_CORRUPTION, ErrorType::KUDU_2233_CORRUPTION));
// Test that applies random write operations to a tablet with a high
// maintenance manager load and a non-zero error injection rate.
TEST_P(TabletServerDiskErrorTest, TestRandomOpSequence) {
- if (!AllowSlowTests()) {
- LOG(INFO) << "Not running slow test. To run, use KUDU_ALLOW_SLOW_TESTS=1";
- return;
- }
+ SKIP_IF_SLOW_NOT_ALLOWED();
typedef vector<RowOperationsPB::Type> OpTypeList;
const OpTypeList kOpsIfKeyNotPresent = { RowOperationsPB::INSERT, RowOperationsPB::UPSERT };
- const OpTypeList kOpsIfKeyPresent = { RowOperationsPB::UPSERT, RowOperationsPB::UPDATE,
- RowOperationsPB::DELETE };
- const int kMaxKey = 100000;
+ // If testing KUDU-2233 corruption, reduce the key-space significantly to
+ // make it more likely that rows will have history to merge, and delete more
+ // frequently to ensure a row's history is strewn across multiple rowsets.
+ const bool is_kudu_2233 = GetParam() == ErrorType::KUDU_2233_CORRUPTION;
+ const OpTypeList kOpsIfKeyPresent = is_kudu_2233 ?
+ OpTypeList{ RowOperationsPB::DELETE } :
+ OpTypeList{ RowOperationsPB::UPSERT, RowOperationsPB::UPDATE, RowOperationsPB::DELETE };
+ const int kMaxKey = is_kudu_2233 ? 10 : 100000;
+ if (is_kudu_2233) {
+ // Failures from KUDU-2233 are only caught during compactions; disable them
+ // for now so we can start them only after we begin injecting errors.
+ FLAGS_enable_rowset_compaction = false;
+ }
if (GetParam() == ErrorType::DISK_FAILURE) {
// Set these way up-front so we can change a single value to actually start
// injecting errors. Inject errors into all data dirs but one.
- FLAGS_crash_on_eio = false;
const vector<string> failed_dirs = { mini_server_->options()->fs_opts.data_roots.begin() + 1,
mini_server_->options()->fs_opts.data_roots.end() };
FLAGS_env_inject_eio_globs = JoinStrings(JoinPathSegmentsV(failed_dirs, "**"), ",");
}
set<int> keys;
- const auto GetRandomString = [] {
+ const auto GetRandomString = [&] {
return StringPrintf("%d", rand() % kMaxKey);
};
@@ -816,13 +824,12 @@
RpcController controller;
RowOperationsPB::Type op_type;
int key = rand() % kMaxKey;
- auto key_iter = keys.find(key);
- if (key_iter == keys.end()) {
- // If the key already exists, insert or upsert.
- op_type = kOpsIfKeyNotPresent[rand() % kOpsIfKeyNotPresent.size()];
- } else {
- // ... else we can do anything but insert.
+ if (ContainsKey(keys, key)) {
+ // If the key already exists, update, upsert, or delete it.
op_type = kOpsIfKeyPresent[rand() % kOpsIfKeyPresent.size()];
+ } else {
+ // ... otherwise, insert or upsert.
+ op_type = kOpsIfKeyNotPresent[rand() % kOpsIfKeyNotPresent.size()];
}
// Add the op to the request.
@@ -832,7 +839,7 @@
keys.insert(key);
} else {
AddTestKeyToPB(RowOperationsPB::DELETE, schema_, key, req.mutable_row_operations());
- keys.erase(key_iter);
+ keys.erase(key);
}
// Finally, write to the server and log the response.
@@ -841,7 +848,8 @@
return resp.has_error() ? StatusFromPB(resp.error().status()) : Status::OK();
};
- // Perform some arbitrarily large number of ops, with some pauses to encourage flushes.
+ // Perform some arbitrarily large number of ops, with some pauses to
+ // encourage flushes.
for (int i = 0; i < 500; i++) {
if (i % 10) {
SleepFor(MonoDelta::FromMilliseconds(100));
@@ -857,6 +865,12 @@
case ErrorType::CFILE_CORRUPTION:
FLAGS_cfile_inject_corruption = 0.01;
break;
+ case ErrorType::KUDU_2233_CORRUPTION:
+ // KUDU-2233 errors only get triggered with very specific compactions, so
+ // bump the failure rate all the way up.
+ FLAGS_tablet_inject_kudu_2233 = 1.0;
+ FLAGS_enable_rowset_compaction = true;
+ break;
}
// The tablet will eventually be failed and will not be able to accept
@@ -937,6 +951,23 @@
});
}
+TEST_F(TabletServerTest, TestFailReplicaOnKUDU2233Corruption) {
+ FLAGS_tablet_inject_kudu_2233 = 1;
+ // Trigger the code paths that crash a server in the case of KUDU-2233
+ // compactions, i.e. where a compaction needs to merge the history of a
+ // single row across multiple rowsets.
+ NO_FATALS(InsertTestRowsRemote(1, 1));
+ NO_FATALS(DeleteTestRowsRemote(1, 1));
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+ NO_FATALS(InsertTestRowsRemote(1, 1));
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+ Status s = tablet_replica_->tablet()->Compact(Tablet::FORCE_COMPACT_ALL);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_EQ(tablet::FAILED, tablet_replica_->state());
+ });
+}
+
class TabletServerMaintenanceMemoryPressureTest : public TabletServerTestBase {
public:
void SetUp() override {
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 3bdf11e..c8b87f3 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -129,6 +129,10 @@
ErrorHandlerType::CFILE_CORRUPTION, [this](const string& uuid) {
this->tablet_manager_->FailTabletAndScheduleShutdown(uuid);
});
+ fs_manager_->SetErrorNotificationCb(
+ ErrorHandlerType::KUDU_2233_CORRUPTION, [this](const string& uuid) {
+ this->tablet_manager_->FailTabletAndScheduleShutdown(uuid);
+ });
unique_ptr<ServiceIf> ts_service(new TabletServiceImpl(this));
unique_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));