feat(replication): make replication delay configurable (#3087)

Co-authored-by: Twice <twice.mliu@gmail.com>
Co-authored-by: hulk <hulk.website@gmail.com>
diff --git a/kvrocks.conf b/kvrocks.conf
index b69e264..65a820e 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -193,6 +193,18 @@
 # future 'clusterx setnodes' commands because the replication thread is blocked on recv.
 replication-recv-timeout-ms 3200
 
+# Maximum bytes to buffer before sending replication data to replicas.
+# The master will pack multiple write batches into one bulk to reduce network overhead,
+# but will send immediately if the bulk size exceeds this limit.
+# Default: 16KB (16384 bytes)
+replication-delay-bytes 16384
+
+# Maximum number of updates to buffer before sending replication data to replicas.
+# The master will pack multiple write batches into one bulk to reduce network overhead,
+# but will send immediately if the number of updates exceeds this limit.
+# Default: 16 updates
+replication-delay-updates 16
+
 # TCP listen() backlog.
 #
 # In high requests-per-second environments you need an high backlog in order
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 4cc51bb..13a4151 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -56,6 +56,14 @@
 #include <openssl/ssl.h>
 #endif
 
+FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq)
+    : srv_(srv),
+      conn_(conn),
+      next_repl_seq_(next_repl_seq),
+      req_(srv),
+      max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes),
+      max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {}
+
 Status FeedSlaveThread::Start() {
   auto s = util::CreateThread("feed-replica", [this] {
     sigset_t mask, omask;
@@ -194,8 +202,8 @@
     // 3. To avoid master don't send replication stream to slave since of packing
     //    batches strategy, we still send batches if current batch sequence is less
     //    kMaxDelayUpdates than latest sequence.
-    if (is_first_repl_batch || batches_bulk.size() >= kMaxDelayBytes || updates_in_batches >= kMaxDelayUpdates ||
-        srv_->storage->LatestSeqNumber() - batch.sequence <= kMaxDelayUpdates) {
+    if (is_first_repl_batch || batches_bulk.size() >= max_delay_bytes_ || updates_in_batches >= max_delay_updates_ ||
+        srv_->storage->LatestSeqNumber() - batch.sequence <= max_delay_updates_) {
       // Send entire bulk which contain multiple batches
       auto s = util::SockSend(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent());
       if (!s.IsOK()) {
@@ -205,7 +213,7 @@
       }
       is_first_repl_batch = false;
       batches_bulk.clear();
-      if (batches_bulk.capacity() > kMaxDelayBytes * 2) batches_bulk.shrink_to_fit();
+      if (batches_bulk.capacity() > max_delay_bytes_ * 2) batches_bulk.shrink_to_fit();
       updates_in_batches = 0;
     }
     curr_seq = batch.sequence + batch.writeBatchPtr->Count();
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index 11edbd6..547c210 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -64,8 +64,7 @@
 
 class FeedSlaveThread {
  public:
-  explicit FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq)
-      : srv_(srv), conn_(conn), next_repl_seq_(next_repl_seq), req_(srv) {}
+  explicit FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq);
   ~FeedSlaveThread() = default;
 
   Status Start();
@@ -87,8 +86,9 @@
   redis::Request req_;
   std::atomic<rocksdb::SequenceNumber> ack_seq_ = 0;
 
-  static const size_t kMaxDelayUpdates = 16;
-  static const size_t kMaxDelayBytes = 16 * 1024;
+  // Configurable delay limits
+  size_t max_delay_bytes_;
+  size_t max_delay_updates_;
 
   void loop();
   void checkLivenessIfNeed();
diff --git a/src/config/config.cc b/src/config/config.cc
index f93a094..09770ee 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -203,6 +203,8 @@
       {"slave-read-only", false, new YesNoField(&slave_readonly, true)},
       {"replication-connect-timeout-ms", false, new IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)},
       {"replication-recv-timeout-ms", false, new IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)},
+      {"replication-delay-bytes", false, new IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)},
+      {"replication-delay-updates", false, new IntField(&max_replication_delay_updates, 16, 1, INT_MAX)},
       {"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
       {"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)},
       {"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
diff --git a/src/config/config.h b/src/config/config.h
index 2dd4157..2e774c9 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -120,6 +120,8 @@
   int slave_priority = 100;
   int replication_connect_timeout_ms = 3100;
   int replication_recv_timeout_ms = 3200;
+  int max_replication_delay_bytes = 16 * 1024;  // 16KB default
+  int max_replication_delay_updates = 16;       // 16 updates default
   int max_db_size = 0;
   int max_replication_mb = 0;
   int max_io_mb = 0;