IGNITE-22128 Balancing partitions across stripes (#3690)
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
index 4c7efa1..66c77c3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
@@ -45,7 +45,7 @@
private LogStorage logStorage;
/** Stripe, that corresponds to the current log storage instance. */
- private final Stripe stripe;
+ private Stripe stripe;
/** Size threshold of log entries list, that will trigger the flush upon the excess. */
private int maxAppendBufferSize;
@@ -56,15 +56,6 @@
*/
private boolean sharedLogStorage;
- /**
- * Constructor.
- *
- * @param stripe Stripe that corresponds to a worker thread in {@link LogManagerOptions#getLogManagerDisruptor()}.
- */
- public StripeAwareLogManager(Stripe stripe) {
- this.stripe = stripe;
- }
-
@Override
public boolean init(LogManagerOptions opts) {
LogStorage logStorage = opts.getLogStorage();
@@ -73,7 +64,15 @@
this.logStorage = logStorage;
this.maxAppendBufferSize = opts.getRaftOptions().getMaxAppendBufferSize();
- return super.init(opts);
+ boolean isInitSuccessfully = super.init(opts);
+
+ int stripe = opts.getLogManagerDisruptor().getStripe(opts.getNode().getNodeId());
+
+ assert stripe != -1;
+
+ this.stripe = opts.getLogStripes().get(stripe);
+
+ return isInitSuccessfully;
}
/**
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 963d25b..b41929d 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -611,9 +611,9 @@
private boolean initLogStorage() {
Requires.requireNonNull(this.fsmCaller, "Null fsm caller");
this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);
- int stripe = options.getLogManagerDisruptor().getStripe(getNodeId());
- this.logManager = new StripeAwareLogManager(options.getLogStripes().get(stripe));
- final LogManagerOptions opts = new LogManagerOptions();
+ this.logManager = new StripeAwareLogManager();
+
+ LogManagerOptions opts = new LogManagerOptions();
opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
opts.setLogStorage(this.logStorage);
opts.setConfigurationManager(this.configManager);
@@ -622,6 +622,7 @@
opts.setNodeMetrics(this.metrics);
opts.setRaftOptions(this.raftOptions);
opts.setLogManagerDisruptor(options.getLogManagerDisruptor());
+ opts.setLogStripes(options.getLogStripes());
return this.logManager.init(opts);
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
index bb27f01..0010003 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -48,6 +49,9 @@
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(StripedDisruptor.class);
+ /** The counter is used to generate the next stripe to subscribe to in order to be a round-robin hash. */
+ private final AtomicInteger incrementalCounter = new AtomicInteger();
+
/** Array of disruptors. Each Disruptor in the appropriate stripe. */
private final Disruptor<T>[] disruptors;
@@ -152,7 +156,7 @@
.setWaitStrategy(useYieldStrategy ? new YieldingWaitStrategy() : new BlockingWaitStrategy())
.build();
- eventHandlers.add(new StripeEntryHandler());
+ eventHandlers.add(new StripeEntryHandler(i));
exceptionHandlers.add(new StripeExceptionHandler(name));
disruptor.handleEventsWith(eventHandlers.get(i));
@@ -202,12 +206,16 @@
* @return Disruptor queue appropriate to the group.
*/
public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> handler, BiConsumer<T, Throwable> exceptionHandler) {
- eventHandlers.get(getStripe(nodeId)).subscribe(nodeId, handler);
+ assert getStripe(nodeId) == -1 : "The double subscriber for the one replication group [nodeId=" + nodeId + "].";
+
+ int stripeId = nextStripeToSubscribe();
+
+ eventHandlers.get(stripeId).subscribe(nodeId, handler);
if (exceptionHandler != null)
- exceptionHandlers.get(getStripe(nodeId)).subscribe(nodeId, exceptionHandler);
+ exceptionHandlers.get(stripeId).subscribe(nodeId, exceptionHandler);
- return queues[getStripe(nodeId)];
+ return queues[stripeId];
}
/**
@@ -216,18 +224,37 @@
* @param nodeId Node id.
*/
public void unsubscribe(NodeId nodeId) {
- eventHandlers.get(getStripe(nodeId)).unsubscribe(nodeId);
- exceptionHandlers.get(getStripe(nodeId)).unsubscribe(nodeId);
+ int stripeId = getStripe(nodeId);
+
+ assert stripeId != -1 : "The replication group has not subscribed yet [nodeId=" + nodeId + "].";
+
+ eventHandlers.get(stripeId).unsubscribe(nodeId);
+ exceptionHandlers.get(stripeId).unsubscribe(nodeId);
}
/**
- * Determines a stripe by a node id and returns a stripe number.
+ * If the replication group is already subscribed, this method determines a stripe by a node id and returns a stripe number.
+ * If the replication group did not subscribed yet, this method returns {@code -1};
*
* @param nodeId Node id.
* @return Stripe of the Striped disruptor.
*/
public int getStripe(NodeId nodeId) {
- return Math.abs(nodeId.hashCode() % stripes);
+ for (StripeEntryHandler handler : eventHandlers) {
+ if (handler.isSubscribed(nodeId)) {
+ return handler.stripeId;
+ }
+ }
+
+ return -1;
+ }
+
+ /**
+ * Generates the next stripe number in a round-robin manner.
+ * @return The stripe number.
+ */
+ private int nextStripeToSubscribe() {
+ return Math.abs(incrementalCounter.getAndIncrement() % stripes);
}
/**
@@ -237,7 +264,11 @@
* @return Disruptor queue appropriate to the group.
*/
public RingBuffer<T> queue(NodeId nodeId) {
- return queues[getStripe(nodeId)];
+ int stripeId = getStripe(nodeId);
+
+ assert stripeId != -1 : "The replication group has not subscribed yet [nodeId=" + nodeId + "].";
+
+ return queues[stripeId];
}
/**
@@ -245,16 +276,28 @@
* It routs an event to the event handler for a group.
*/
private class StripeEntryHandler implements EventHandler<T> {
- private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers;
+ private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers = new ConcurrentHashMap<>();
/** Size of the batch that is currently being handled. */
private int currentBatchSize = 0;
+ /** Stripe id. */
+ private final int stripeId;
+
/**
* The constructor.
*/
- StripeEntryHandler() {
- subscribers = new ConcurrentHashMap<>();
+ StripeEntryHandler(int stripeId) {
+ this.stripeId = stripeId;
+ }
+
+ /**
+ * Checks the replication group is subscribed to this stripe or not.
+ * @param nodeId Replication group node id.
+ * @return True if the group is subscribed, false otherwise.
+ */
+ public boolean isSubscribed(NodeId nodeId) {
+ return subscribers.containsKey(nodeId);
}
/**
@@ -283,7 +326,7 @@
// TODO: IGNITE-20536 Need to add assert that handler is not null and to implement a no-op handler.
if (handler != null) {
if (metrics != null && metrics.enabled()) {
- metrics.hitToStripe(getStripe(event.nodeId()));
+ metrics.hitToStripe(stripeId);
if (endOfBatch) {
metrics.addBatchSize(currentBatchSize + 1);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
index fb859f3..53640ab 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
@@ -16,6 +16,8 @@
*/
package org.apache.ignite.raft.jraft.option;
+import java.util.List;
+import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
@@ -38,6 +40,15 @@
private NodeMetrics nodeMetrics;
private LogEntryCodecFactory logEntryCodecFactory = LogEntryV1CodecFactory.getInstance();
private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
+ private List<Stripe> logStripes;
+
+ public void setLogStripes(List<Stripe> logStripes) {
+ this.logStripes = logStripes;
+ }
+
+ public List<Stripe> getLogStripes() {
+ return this.logStripes;
+ }
public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
return logManagerDisruptor;
diff --git a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
index 8105dff..06dd061 100644
--- a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
@@ -17,12 +17,15 @@
package org.apache.ignite.disruptor;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
+import java.util.Random;
+import java.util.UUID;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -124,6 +127,53 @@
disruptor.shutdown();
}
+ /**
+ * Checks the distribution of subscribed handlers across stripes.
+ * The distribution algorithm has to distribute handlers as evenly as possible using the round-robin algorithm.
+ */
+ @Test
+ public void tesDistributionHandlers() {
+ Random random = new Random();
+
+ int stripes = random.nextInt(20);
+
+ StripedDisruptor<NodeIdAwareTestObj> disruptor = new StripedDisruptor<>("test", "test-disruptor",
+ 16384,
+ NodeIdAwareTestObj::new,
+ stripes,
+ false,
+ false,
+ null);
+
+ int handlers = random.nextInt(100);
+
+ log.info("Handlers will be distributed across stripes [handlers={}, stripes={}]", handlers, stripes);
+
+ int[] distribution = new int[stripes];
+
+ for (int i = 0; i < handlers; i++) {
+ GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();
+
+ var nodeId = new NodeId("grp", new PeerId(UUID.randomUUID().toString()));
+
+ disruptor.subscribe(nodeId, handler);
+
+ int stripe = disruptor.getStripe(nodeId);
+
+ assertNotEquals(-1, stripe);
+
+ distribution[stripe]++;
+ }
+
+ log.info("Result distribution [distribution={}]", distribution);
+
+ int reference = distribution[0];
+
+ for (int i = 1; i < stripes; i++) {
+ assertTrue(distribution[i] == reference || distribution[i] + 1 == reference || distribution[i] - 1 == reference);
+ }
+ }
+
/** Group event handler. */
private static class GroupAwareTestObjHandler implements EventHandler<NodeIdAwareTestObj> {
/** This is a container for the batch events. */