feat(replication): make replication delay configurable (#3087)
Co-authored-by: Twice <twice.mliu@gmail.com>
Co-authored-by: hulk <hulk.website@gmail.com>
diff --git a/kvrocks.conf b/kvrocks.conf
index b69e264..65a820e 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -193,6 +193,18 @@
# future 'clusterx setnodes' commands because the replication thread is blocked on recv.
replication-recv-timeout-ms 3200
+# Maximum bytes to buffer before sending replication data to replicas.
+# The master will pack multiple write batches into one bulk to reduce network overhead,
+# but will send immediately if the bulk size exceeds this limit.
+# Default: 16KB (16384 bytes)
+replication-delay-bytes 16384
+
+# Maximum number of updates to buffer before sending replication data to replicas.
+# The master will pack multiple write batches into one bulk to reduce network overhead,
+# but will send immediately if the number of updates exceeds this limit.
+# Default: 16 updates
+replication-delay-updates 16
+
# TCP listen() backlog.
#
# In high requests-per-second environments you need an high backlog in order
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 4cc51bb..13a4151 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -56,6 +56,14 @@
#include <openssl/ssl.h>
#endif
+FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq)
+ : srv_(srv),
+ conn_(conn),
+ next_repl_seq_(next_repl_seq),
+ req_(srv),
+ max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes),
+ max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {}
+
Status FeedSlaveThread::Start() {
auto s = util::CreateThread("feed-replica", [this] {
sigset_t mask, omask;
@@ -194,8 +202,8 @@
// 3. To avoid master don't send replication stream to slave since of packing
// batches strategy, we still send batches if current batch sequence is less
// kMaxDelayUpdates than latest sequence.
- if (is_first_repl_batch || batches_bulk.size() >= kMaxDelayBytes || updates_in_batches >= kMaxDelayUpdates ||
- srv_->storage->LatestSeqNumber() - batch.sequence <= kMaxDelayUpdates) {
+ if (is_first_repl_batch || batches_bulk.size() >= max_delay_bytes_ || updates_in_batches >= max_delay_updates_ ||
+ srv_->storage->LatestSeqNumber() - batch.sequence <= max_delay_updates_) {
// Send entire bulk which contain multiple batches
auto s = util::SockSend(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent());
if (!s.IsOK()) {
@@ -205,7 +213,7 @@
}
is_first_repl_batch = false;
batches_bulk.clear();
- if (batches_bulk.capacity() > kMaxDelayBytes * 2) batches_bulk.shrink_to_fit();
+ if (batches_bulk.capacity() > max_delay_bytes_ * 2) batches_bulk.shrink_to_fit();
updates_in_batches = 0;
}
curr_seq = batch.sequence + batch.writeBatchPtr->Count();
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index 11edbd6..547c210 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -64,8 +64,7 @@
class FeedSlaveThread {
public:
- explicit FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq)
- : srv_(srv), conn_(conn), next_repl_seq_(next_repl_seq), req_(srv) {}
+ explicit FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq);
~FeedSlaveThread() = default;
Status Start();
@@ -87,8 +86,9 @@
redis::Request req_;
std::atomic<rocksdb::SequenceNumber> ack_seq_ = 0;
- static const size_t kMaxDelayUpdates = 16;
- static const size_t kMaxDelayBytes = 16 * 1024;
+ // Configurable delay limits
+ size_t max_delay_bytes_;
+ size_t max_delay_updates_;
void loop();
void checkLivenessIfNeed();
diff --git a/src/config/config.cc b/src/config/config.cc
index f93a094..09770ee 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -203,6 +203,8 @@
{"slave-read-only", false, new YesNoField(&slave_readonly, true)},
{"replication-connect-timeout-ms", false, new IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)},
{"replication-recv-timeout-ms", false, new IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)},
+ {"replication-delay-bytes", false, new IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)},
+ {"replication-delay-updates", false, new IntField(&max_replication_delay_updates, 16, 1, INT_MAX)},
{"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
{"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)},
{"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
diff --git a/src/config/config.h b/src/config/config.h
index 2dd4157..2e774c9 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -120,6 +120,8 @@
int slave_priority = 100;
int replication_connect_timeout_ms = 3100;
int replication_recv_timeout_ms = 3200;
+ int max_replication_delay_bytes = 16 * 1024; // 16KB default
+ int max_replication_delay_updates = 16; // 16 updates default
int max_db_size = 0;
int max_replication_mb = 0;
int max_io_mb = 0;