IGNITE-22128 Balancing partitions across stripes (#3690)

diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
index 4c7efa1..66c77c3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
@@ -45,7 +45,7 @@
     private LogStorage logStorage;
 
     /** Stripe, that corresponds to the current log storage instance. */
-    private final Stripe stripe;
+    private Stripe stripe;
 
     /** Size threshold of log entries list, that will trigger the flush upon the excess. */
     private int maxAppendBufferSize;
@@ -56,15 +56,6 @@
      */
     private boolean sharedLogStorage;
 
-    /**
-     * Constructor.
-     *
-     * @param stripe Stripe that corresponds to a worker thread in {@link LogManagerOptions#getLogManagerDisruptor()}.
-     */
-    public StripeAwareLogManager(Stripe stripe) {
-        this.stripe = stripe;
-    }
-
     @Override
     public boolean init(LogManagerOptions opts) {
         LogStorage logStorage = opts.getLogStorage();
@@ -73,7 +64,15 @@
         this.logStorage = logStorage;
         this.maxAppendBufferSize = opts.getRaftOptions().getMaxAppendBufferSize();
 
-        return super.init(opts);
+        boolean isInitSuccessfully = super.init(opts);
+
+        int stripe = opts.getLogManagerDisruptor().getStripe(opts.getNode().getNodeId());
+
+        assert stripe != -1;
+
+        this.stripe = opts.getLogStripes().get(stripe);
+
+        return isInitSuccessfully;
     }
 
     /**
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 963d25b..b41929d 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -611,9 +611,9 @@
     private boolean initLogStorage() {
         Requires.requireNonNull(this.fsmCaller, "Null fsm caller");
         this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);
-        int stripe = options.getLogManagerDisruptor().getStripe(getNodeId());
-        this.logManager = new StripeAwareLogManager(options.getLogStripes().get(stripe));
-        final LogManagerOptions opts = new LogManagerOptions();
+        this.logManager = new StripeAwareLogManager();
+
+        LogManagerOptions opts = new LogManagerOptions();
         opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
         opts.setLogStorage(this.logStorage);
         opts.setConfigurationManager(this.configManager);
@@ -622,6 +622,7 @@
         opts.setNodeMetrics(this.metrics);
         opts.setRaftOptions(this.raftOptions);
         opts.setLogManagerDisruptor(options.getLogManagerDisruptor());
+        opts.setLogStripes(options.getLogStripes());
 
         return this.logManager.init(opts);
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
index bb27f01..0010003 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -29,6 +29,7 @@
 import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -48,6 +49,9 @@
     /** The logger. */
     private static final IgniteLogger LOG = Loggers.forClass(StripedDisruptor.class);
 
+    /** The counter is used to generate the next stripe to subscribe to in order to be a round-robin hash. */
+    private final AtomicInteger incrementalCounter = new AtomicInteger();
+
     /** Array of disruptors. Each Disruptor in the appropriate stripe. */
     private final Disruptor<T>[] disruptors;
 
@@ -152,7 +156,7 @@
                 .setWaitStrategy(useYieldStrategy ? new YieldingWaitStrategy() : new BlockingWaitStrategy())
                 .build();
 
-            eventHandlers.add(new StripeEntryHandler());
+            eventHandlers.add(new StripeEntryHandler(i));
             exceptionHandlers.add(new StripeExceptionHandler(name));
 
             disruptor.handleEventsWith(eventHandlers.get(i));
@@ -202,12 +206,16 @@
      * @return Disruptor queue appropriate to the group.
      */
     public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> handler, BiConsumer<T, Throwable> exceptionHandler) {
-        eventHandlers.get(getStripe(nodeId)).subscribe(nodeId, handler);
+        assert getStripe(nodeId) == -1 : "The double subscriber for the one replication group [nodeId=" + nodeId + "].";
+
+        int stripeId = nextStripeToSubscribe();
+
+        eventHandlers.get(stripeId).subscribe(nodeId, handler);
 
         if (exceptionHandler != null)
-            exceptionHandlers.get(getStripe(nodeId)).subscribe(nodeId, exceptionHandler);
+            exceptionHandlers.get(stripeId).subscribe(nodeId, exceptionHandler);
 
-        return queues[getStripe(nodeId)];
+        return queues[stripeId];
     }
 
     /**
@@ -216,18 +224,37 @@
      * @param nodeId Node id.
      */
     public void unsubscribe(NodeId nodeId) {
-        eventHandlers.get(getStripe(nodeId)).unsubscribe(nodeId);
-        exceptionHandlers.get(getStripe(nodeId)).unsubscribe(nodeId);
+        int stripeId = getStripe(nodeId);
+
+        assert stripeId != -1 : "The replication group has not subscribed yet [nodeId=" + nodeId + "].";
+
+        eventHandlers.get(stripeId).unsubscribe(nodeId);
+        exceptionHandlers.get(stripeId).unsubscribe(nodeId);
     }
 
     /**
-     * Determines a stripe by a node id and returns a stripe number.
+     * If the replication group is already subscribed, this method determines a stripe by a node id and returns a stripe number.
+     * If the replication group did not subscribed yet, this method returns {@code -1};
      *
      * @param nodeId Node id.
      * @return Stripe of the Striped disruptor.
      */
     public int getStripe(NodeId nodeId) {
-        return Math.abs(nodeId.hashCode() % stripes);
+        for (StripeEntryHandler handler : eventHandlers) {
+            if (handler.isSubscribed(nodeId)) {
+                return handler.stripeId;
+            }
+        }
+
+        return -1;
+    }
+
+    /**
+     * Generates the next stripe number in a round-robin manner.
+     * @return The stripe number.
+     */
+    private int nextStripeToSubscribe() {
+        return Math.abs(incrementalCounter.getAndIncrement() % stripes);
     }
 
     /**
@@ -237,7 +264,11 @@
      * @return Disruptor queue appropriate to the group.
      */
     public RingBuffer<T> queue(NodeId nodeId) {
-        return queues[getStripe(nodeId)];
+        int stripeId = getStripe(nodeId);
+
+        assert stripeId != -1 : "The replication group has not subscribed yet [nodeId=" + nodeId + "].";
+
+        return queues[stripeId];
     }
 
     /**
@@ -245,16 +276,28 @@
      * It routs an event to the event handler for a group.
      */
     private class StripeEntryHandler implements EventHandler<T> {
-        private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers;
+        private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers = new ConcurrentHashMap<>();
 
         /** Size of the batch that is currently being handled. */
         private int currentBatchSize = 0;
 
+        /** Stripe id. */
+        private final int stripeId;
+
         /**
          * The constructor.
          */
-        StripeEntryHandler() {
-            subscribers = new ConcurrentHashMap<>();
+        StripeEntryHandler(int stripeId) {
+            this.stripeId = stripeId;
+        }
+
+        /**
+         * Checks the replication group is subscribed to this stripe or not.
+         * @param nodeId Replication group node id.
+         * @return True if the group is subscribed, false otherwise.
+         */
+        public boolean isSubscribed(NodeId nodeId) {
+            return subscribers.containsKey(nodeId);
         }
 
         /**
@@ -283,7 +326,7 @@
             // TODO: IGNITE-20536 Need to add assert that handler is not null and to implement a no-op handler.
             if (handler != null) {
                 if (metrics != null && metrics.enabled()) {
-                    metrics.hitToStripe(getStripe(event.nodeId()));
+                    metrics.hitToStripe(stripeId);
 
                     if (endOfBatch) {
                         metrics.addBatchSize(currentBatchSize + 1);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
index fb859f3..53640ab 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
@@ -16,6 +16,8 @@
  */
 package org.apache.ignite.raft.jraft.option;
 
+import java.util.List;
+import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
 import org.apache.ignite.raft.jraft.FSMCaller;
 import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
@@ -38,6 +40,15 @@
     private NodeMetrics nodeMetrics;
     private LogEntryCodecFactory logEntryCodecFactory = LogEntryV1CodecFactory.getInstance();
     private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
+    private List<Stripe> logStripes;
+
+    public void setLogStripes(List<Stripe> logStripes) {
+            this.logStripes = logStripes;
+    }
+
+    public List<Stripe> getLogStripes() {
+        return this.logStripes;
+    }
 
     public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
         return logManagerDisruptor;
diff --git a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
index 8105dff..06dd061 100644
--- a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.ignite.disruptor;
 
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
 import java.util.ArrayList;
+import java.util.Random;
+import java.util.UUID;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -124,6 +127,53 @@
         disruptor.shutdown();
     }
 
+    /**
+     * Checks the distribution of subscribed handlers across stripes.
+     * The distribution algorithm has to distribute handlers as evenly as possible using the round-robin algorithm.
+     */
+    @Test
+    public void tesDistributionHandlers() {
+        Random random = new Random();
+
+        int stripes = random.nextInt(20);
+
+        StripedDisruptor<NodeIdAwareTestObj> disruptor = new StripedDisruptor<>("test", "test-disruptor",
+                16384,
+                NodeIdAwareTestObj::new,
+                stripes,
+                false,
+                false,
+                null);
+
+        int handlers = random.nextInt(100);
+
+        log.info("Handlers will be distributed across stripes [handlers={}, stripes={}]", handlers, stripes);
+
+        int[] distribution = new int[stripes];
+
+        for (int i = 0; i < handlers; i++) {
+            GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();
+
+            var nodeId = new NodeId("grp", new PeerId(UUID.randomUUID().toString()));
+
+            disruptor.subscribe(nodeId, handler);
+
+            int stripe = disruptor.getStripe(nodeId);
+
+            assertNotEquals(-1, stripe);
+
+            distribution[stripe]++;
+        }
+
+        log.info("Result distribution [distribution={}]", distribution);
+
+        int reference = distribution[0];
+
+        for (int i = 1; i < stripes; i++) {
+            assertTrue(distribution[i] == reference || distribution[i] + 1 == reference || distribution[i] - 1 == reference);
+        }
+    }
+
     /** Group event handler. */
     private static class GroupAwareTestObjHandler implements EventHandler<NodeIdAwareTestObj> {
         /** This is a container for the batch events. */