JAMES-3693 Update redis config to support master-replica topology (#2238)

diff --git a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala
index ce6fbe1..0cdf80a 100644
--- a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala
+++ b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala
@@ -29,36 +29,42 @@
 import org.slf4j.{Logger, LoggerFactory}
 
 object RedisConfiguration {
-  val CLUSTER_ENABLED_DEFAULT = false
+  val STANDALONE_TOPOLOGY = "standalone"
+  val CLUSTER_TOPOLOGY = "cluster"
+  val MASTER_REPLICA_TOPOLOGY = "master-replica"
 
   val LOGGER: Logger = LoggerFactory.getLogger(classOf[RedisConfiguration])
 
   def from(config: Configuration): RedisConfiguration = {
     val configuration = from(config.getStringArray("redisURL"),
-      config.getBoolean("cluster.enabled", CLUSTER_ENABLED_DEFAULT),
+      config.getString("redis.topology", STANDALONE_TOPOLOGY) match {
+        case STANDALONE_TOPOLOGY => Standalone
+        case CLUSTER_TOPOLOGY => Cluster
+        case MASTER_REPLICA_TOPOLOGY => MasterReplica
+        case _ => throw new NotImplementedError()
+      },
       Option(config.getInteger("redis.ioThreads", null)).map(Integer2int),
       Option(config.getInteger("redis.workerThreads", null)).map(Integer2int))
 
     LOGGER.info("Redis was loaded with configuration: \n" +
       "redisURL: {}\n" +
-      "isCluster: {}\n" +
+      "redisTopology: {}\n" +
       "redis.ioThreads: {}\n" +
       "redis.workerThreads: {}", configuration.redisURI.value.map(_.toString).mkString(";"),
-      configuration.isCluster, configuration.ioThreads, configuration.workerThreads)
+      configuration.redisTopology, configuration.ioThreads, configuration.workerThreads)
 
     configuration
   }
 
-  def from(redisUri: String, isCluster: Boolean, ioThreads: Option[Int], workerThreads: Option[Int]): RedisConfiguration =
-    from(Array(redisUri), isCluster, ioThreads, workerThreads)
+  def from(redisUri: String, redisTopology: RedisTopology, ioThreads: Option[Int], workerThreads: Option[Int]): RedisConfiguration =
+    from(Array(redisUri), redisTopology, ioThreads, workerThreads)
 
-  def from(redisUris: Array[String], isCluster: Boolean, ioThreads: Option[Int], workerThreads: Option[Int]): RedisConfiguration = {
+  def from(redisUris: Array[String], redisTopology: RedisTopology, ioThreads: Option[Int], workerThreads: Option[Int]): RedisConfiguration = {
     Preconditions.checkArgument(redisUris != null && redisUris.length > 0)
-    Preconditions.checkNotNull(isCluster)
-    RedisConfiguration(RedisUris.from(redisUris), isCluster, ioThreads, workerThreads)
+    RedisConfiguration(RedisUris.from(redisUris), redisTopology, ioThreads, workerThreads)
   }
 
-  def from(redisUri: String, isCluster: Boolean): RedisConfiguration = from(redisUri, isCluster, None, None)
+  def from(redisUri: String, redisTopology: RedisTopology): RedisConfiguration = from(redisUri, redisTopology, None, None)
 }
 
 object RedisUris {
@@ -82,4 +88,12 @@
   def from(value: Array[String]): RedisUris = liftOrThrow(value.toList.map(RedisURI.create))
 }
 
-case class RedisConfiguration(redisURI: RedisUris, isCluster: Boolean, ioThreads: Option[Int], workerThreads:Option[Int])
+sealed trait RedisTopology
+
+case object Standalone extends RedisTopology
+
+case object Cluster extends RedisTopology
+
+case object MasterReplica extends RedisTopology
+
+case class RedisConfiguration(redisURI: RedisUris, redisTopology: RedisTopology, ioThreads: Option[Int], workerThreads:Option[Int])
diff --git a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala
index 2b1d7e7..d149f3f 100644
--- a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala
+++ b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala
@@ -25,6 +25,7 @@
 import io.lettuce.core.api.reactive.RedisReactiveCommands
 import io.lettuce.core.cluster.RedisClusterClient
 import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands
+import io.lettuce.core.codec.StringCodec
 import jakarta.annotation.PreDestroy
 import jakarta.inject.Inject
 import org.apache.commons.lang3.StringUtils
@@ -45,9 +46,10 @@
 class RedisHealthCheck @Inject()(redisConfiguration: RedisConfiguration) extends HealthCheck {
 
   private val healthcheckTimeout: Duration = Duration.ofSeconds(3)
-  private val healthcheckPerform: RedisHealthcheckPerform = redisConfiguration.isCluster match {
-    case true => new RedisClusterHealthCheckPerform(redisConfiguration, healthcheckTimeout)
-    case false => new RedisStandaloneHealthCheckPerform(redisConfiguration, healthcheckTimeout)
+  private val healthcheckPerform: RedisHealthcheckPerform = redisConfiguration.redisTopology match {
+    case Cluster => new RedisClusterHealthCheckPerform(redisConfiguration, healthcheckTimeout)
+    case MasterReplica => new RedisMasterReplicaHealthCheckPerform(redisConfiguration, healthcheckTimeout)
+    case Standalone => new RedisStandaloneHealthCheckPerform(redisConfiguration, healthcheckTimeout)
   }
 
   override def componentName(): ComponentName = redisComponent
@@ -123,3 +125,33 @@
       .subscribe()
 
 }
+
+class RedisMasterReplicaHealthCheckPerform(val redisConfiguration: RedisConfiguration,
+                                        val healthcheckTimeout: Duration) extends RedisHealthcheckPerform {
+
+  private val PING_SUCCESS_RESPONSE = "PONG"
+
+  private val redisClient: RedisClient = RedisClient.create
+
+  private val redisCommand: RedisReactiveCommands[String, String] = io.lettuce.core.masterreplica.MasterReplica.connect(redisClient,
+    StringCodec.UTF8,
+    redisConfiguration.redisURI.value
+      .map(rURI => {
+        rURI.setTimeout(healthcheckTimeout)
+        rURI
+      }).asJava)
+    .reactive()
+
+  override def check(): SMono[Result] =
+    SMono(redisCommand.ping())
+      .timeout(healthcheckTimeout.toScala)
+      .filter(_ == PING_SUCCESS_RESPONSE)
+      .map(_ => Result.healthy(redisComponent))
+      .switchIfEmpty(SMono.just(Result.degraded(redisComponent, "Can not PING to Redis.")))
+
+  override def close(): Unit =
+    Mono.fromCompletionStage(redisClient.shutdownAsync())
+      .subscribeOn(Schedulers.boundedElastic())
+      .subscribe()
+
+}
diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java
index 94eb71f..4090faa 100644
--- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java
+++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java
@@ -66,7 +66,9 @@
                     .map(redisURIFunction())
                     .map(URI::toString)
                     .toArray(String[]::new),
-                true, OptionConverters.toScala(Optional.empty()), OptionConverters.toScala(Optional.empty()));
+                Cluster$.MODULE$,
+                OptionConverters.toScala(Optional.empty()),
+                OptionConverters.toScala(Optional.empty()));
         }
 
         public void pauseOne() {
diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala
index b304089..2638525 100644
--- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala
+++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala
@@ -30,7 +30,7 @@
   "RedisConfiguration" should "parse Redis URI from config" in {
     val config = new PropertiesConfiguration()
     config.addProperty("redisURL", "redis://localhost:6379")
-    config.addProperty("cluster.enabled", true)
+    config.addProperty("redis.topology", "master-replica")
     config.addProperty("redis.ioThreads", 16)
     config.addProperty("redis.workerThreads", 32)
 
@@ -38,7 +38,7 @@
 
     redisConfig.redisURI.value should have length 1
     redisConfig.redisURI.value should contain theSameElementsAs List(RedisURI.create("redis://localhost:6379"))
-    redisConfig.isCluster shouldEqual true
+    redisConfig.redisTopology shouldEqual MasterReplica
     redisConfig.ioThreads shouldEqual Some(16)
     redisConfig.workerThreads shouldEqual Some(32)
   }
@@ -47,7 +47,7 @@
     val config = new PropertiesConfiguration()
     config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
     config.addProperty("redisURL", "redis://localhost:6379,redis://localhost:6380")
-    config.addProperty("cluster.enabled", true)
+    config.addProperty("redis.topology", "cluster")
     config.addProperty("redis.ioThreads", 16)
     config.addProperty("redis.workerThreads", 32)
 
@@ -55,7 +55,7 @@
 
     redisConfig.redisURI.value should have length 2
     redisConfig.redisURI.value should contain theSameElementsAs List(RedisURI.create("redis://localhost:6379"), RedisURI.create("redis://localhost:6380"))
-    redisConfig.isCluster shouldEqual true
+    redisConfig.redisTopology shouldEqual Cluster
     redisConfig.ioThreads shouldEqual Some(16)
     redisConfig.workerThreads shouldEqual Some(32)
   }
@@ -68,7 +68,7 @@
 
     redisConfig.redisURI.value should have length 1
     redisConfig.redisURI.value should contain theSameElementsAs List(RedisURI.create("redis://localhost:6379"))
-    redisConfig.isCluster shouldEqual false
+    redisConfig.redisTopology shouldEqual Standalone
     redisConfig.ioThreads shouldEqual None
     redisConfig.workerThreads shouldEqual None
   }
diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisExtension.java b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisExtension.java
index 7f8430f..4936ef2 100644
--- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisExtension.java
+++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisExtension.java
@@ -54,7 +54,7 @@
             @Provides
             @Singleton
             public  RedisConfiguration provideConfig() {
-                return RedisConfiguration.from(dockerRedis().redisURI().toString(), false);
+                return RedisConfiguration.from(dockerRedis().redisURI().toString(), Standalone$.MODULE$);
             }
         };
     }
diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaExtension.java b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaExtension.java
new file mode 100644
index 0000000..2f6cf2c
--- /dev/null
+++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaExtension.java
@@ -0,0 +1,168 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.redis;
+
+import static java.lang.Boolean.TRUE;
+import static org.apache.james.backends.redis.DockerRedis.DEFAULT_IMAGE_NAME;
+import static org.apache.james.backends.redis.DockerRedis.DEFAULT_PORT;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
+
+import jakarta.inject.Singleton;
+
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.james.GuiceModuleTestExtension;
+import org.apache.james.util.Runnables;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+
+import scala.Function2;
+import scala.jdk.javaapi.OptionConverters;
+
+public class RedisMasterReplicaExtension implements GuiceModuleTestExtension {
+
+    public static class RedisClusterContainer extends ArrayList<GenericContainer> {
+        public RedisClusterContainer(Collection<? extends GenericContainer> c) {
+            super(c);
+        }
+
+        public RedisConfiguration getRedisConfiguration() {
+            return RedisConfiguration.from(this.stream()
+                    .map(redisURIFunction())
+                    .map(URI::toString)
+                    .toArray(String[]::new),
+                MasterReplica$.MODULE$,
+                OptionConverters.toScala(Optional.empty()),
+                OptionConverters.toScala(Optional.empty()));
+        }
+
+        public void pauseOne() {
+            GenericContainer container = this.get(0);
+            container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec();
+        }
+
+        public void unPauseOne() {
+            GenericContainer container = this.get(0);
+            if (TRUE.equals(container.getDockerClient().inspectContainerCmd(container.getContainerId())
+                .exec()
+                .getState()
+                .getPaused())) {
+                container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec();
+            }
+        }
+    }
+
+    public static final Function2<String, Boolean, GenericContainer> redisContainerSupplier = (alias, isSlave) ->
+        new GenericContainer<>(DEFAULT_IMAGE_NAME)
+            .withExposedPorts(DEFAULT_PORT)
+            .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd.withName("james-" + alias + "-test-" + UUID.randomUUID()))
+            .withCommand(Optional.of(isSlave).filter(aBoolean -> aBoolean)
+                .map(aBoolean -> "redis-server --appendonly yes --port 6379 --slaveof redis1 6379")
+                .orElse("redis-server --appendonly yes --port 6379"))
+            .withNetworkAliases(alias)
+            .waitingFor(Wait.forLogMessage(".*Ready to accept connections.*", 1)
+                .withStartupTimeout(Duration.ofMinutes(2)));
+
+    static final GenericContainer redis1 = redisContainerSupplier.apply("redis1", false);
+    static final GenericContainer redis2 = redisContainerSupplier.apply("redis2", true);
+    static final GenericContainer redis3 = redisContainerSupplier.apply("redis3", true);
+
+    private RedisClusterContainer redisClusterContainer;
+    private final Network network;
+
+    public RedisMasterReplicaExtension() {
+        this(Network.newNetwork());
+    }
+
+    public RedisMasterReplicaExtension(Network network) {
+        this.network = network;
+        redis1.withNetwork(network);
+        redis2.withNetwork(network);
+        redis3.withNetwork(network);
+    }
+
+    @Override
+    public void beforeAll(ExtensionContext extensionContext) throws IOException, InterruptedException {
+        redis1.start();
+        redis2.start();
+        redis3.start();
+        redisClusterContainer = new RedisClusterContainer(List.of(redis1, redis2, redis3));
+    }
+
+    @Override
+    public void afterAll(ExtensionContext extensionContext) {
+        Runnables.runParallel(
+            redis1::stop,
+            redis2::stop,
+            redis3::stop);
+        network.close();
+    }
+
+    @Override
+    public void beforeEach(ExtensionContext extensionContext) throws Exception {
+        redisClusterContainer.forEach(Throwing.consumer(container -> container.execInContainer("redis-cli", "flushall")));
+    }
+
+    @Override
+    public Module getModule() {
+        return new AbstractModule() {
+            @Provides
+            @Singleton
+            public RedisConfiguration provideRedisConfiguration() {
+                return redisClusterContainer.getRedisConfiguration();
+            }
+        };
+    }
+
+    @Override
+    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return parameterContext.getParameter().getType() == RedisClusterContainer.class;
+    }
+
+    @Override
+    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return new RedisClusterContainer(List.of(redis1, redis2, redis3));
+    }
+
+    private static Function<GenericContainer, URI> redisURIFunction() {
+        return redisContainer -> Throwing.supplier(() -> new URIBuilder()
+            .setScheme("redis")
+            .setHost(redisContainer.getHost())
+            .setPort(redisContainer.getMappedPort(DEFAULT_PORT))
+            .build()).get();
+    }
+}
\ No newline at end of file
diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaHealthCheckTest.scala b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaHealthCheckTest.scala
new file mode 100644
index 0000000..cda4a26
--- /dev/null
+++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaHealthCheckTest.scala
@@ -0,0 +1,72 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ************************************************************** */
+
+package org.apache.james.backends.redis
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.james.backends.redis.RedisMasterReplicaExtension.RedisClusterContainer
+import org.assertj.core.api.Assertions.assertThat
+import org.awaitility.Awaitility
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import reactor.core.scala.publisher.SMono
+
+@ExtendWith(Array(classOf[RedisMasterReplicaExtension]))
+class RedisMasterReplicaHealthCheckTest {
+  var redisHealthCheck: RedisHealthCheck = _
+
+  @BeforeEach
+  def setup(redis: RedisClusterContainer): Unit = {
+    redisHealthCheck = new RedisHealthCheck(redis.getRedisConfiguration)
+  }
+
+  @AfterEach
+  def afterEach(redis: RedisClusterContainer): Unit = {
+    redis.unPauseOne();
+  }
+
+  @Test
+  def checkShouldReturnHealthyWhenRedisIsRunning(): Unit = {
+    val result = SMono.fromPublisher(redisHealthCheck.check()).block()
+
+    assertThat(result.isHealthy).isTrue
+  }
+
+  @Test
+  def checkShouldReturnDegradedWhenRedisIsDown(redis: RedisClusterContainer): Unit = {
+    redis.pauseOne()
+
+    Awaitility.await()
+      .pollInterval(2, TimeUnit.SECONDS)
+      .atMost(20, TimeUnit.SECONDS)
+      .untilAsserted(() => assertThat(SMono.fromPublisher(redisHealthCheck.check()).block().isDegraded).isTrue)
+  }
+
+  @Test
+  def checkShouldReturnHealthyWhenRedisIsRecovered(redis: RedisClusterContainer): Unit = {
+    redis.pauseOne()
+    redis.unPauseOne()
+
+    Awaitility.await()
+      .pollInterval(2, TimeUnit.SECONDS)
+      .atMost(20, TimeUnit.SECONDS)
+      .untilAsserted(() => assertThat(SMono.fromPublisher(redisHealthCheck.check()).block().isHealthy).isTrue)
+  }
+}
diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisStandaloneHealthCheckTest.scala b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisStandaloneHealthCheckTest.scala
index 12a9c15..c450ec4 100644
--- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisStandaloneHealthCheckTest.scala
+++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisStandaloneHealthCheckTest.scala
@@ -33,7 +33,7 @@
 
   @BeforeEach
   def setup(redis: DockerRedis): Unit = {
-    val redisConfiguration: RedisConfiguration = RedisConfiguration.from(redis.redisURI().toString, isCluster = false)
+    val redisConfiguration: RedisConfiguration = RedisConfiguration.from(redis.redisURI().toString, Standalone)
 
     redisHealthCheck = new RedisHealthCheck(redisConfiguration)
   }
diff --git a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/redis.adoc b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/redis.adoc
index a673c73..3a4bb1e 100644
--- a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/redis.adoc
+++ b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/redis.adoc
@@ -15,8 +15,8 @@
 | redisURL
 | the Redis URI pointing to Redis server. Compulsory.
 
-| cluster.enabled
-| Whether we should attempt connections to Redis as a cluster.Defaults to false.
+| redis.topology
+| Redis server topology. Defaults to standalone. Possible values: standalone, cluster, master-replica
 
 | redis.ioThreads
 | IO threads to be using for the underlying Netty networking resources. If unspecified driver defaults applies.
diff --git a/server/mailet/rate-limiter-redis/README.adoc b/server/mailet/rate-limiter-redis/README.adoc
index eef8043..e8f5c0a 100644
--- a/server/mailet/rate-limiter-redis/README.adoc
+++ b/server/mailet/rate-limiter-redis/README.adoc
@@ -20,7 +20,7 @@
 
 ----
 redisURL=redis://redis:6379
-cluster.enabled=false
+redis.topology=standalone
 ----
 
 - 5. Use the rate limiting mailets within `mailetcontainer.xml`
diff --git a/server/mailet/rate-limiter-redis/redis.properties b/server/mailet/rate-limiter-redis/redis.properties
index 449e981..a8fe0af 100644
--- a/server/mailet/rate-limiter-redis/redis.properties
+++ b/server/mailet/rate-limiter-redis/redis.properties
@@ -1,2 +1,2 @@
 redisURL=redis://redis:6379
-cluster.enabled=false
\ No newline at end of file
+redis.topology=standalone
\ No newline at end of file
diff --git a/server/mailet/rate-limiter-redis/src/main/java/org/apache/james/rate/limiter/redis/RedisMasterReplicaRateLimiterFactory.java b/server/mailet/rate-limiter-redis/src/main/java/org/apache/james/rate/limiter/redis/RedisMasterReplicaRateLimiterFactory.java
new file mode 100644
index 0000000..6fd3944
--- /dev/null
+++ b/server/mailet/rate-limiter-redis/src/main/java/org/apache/james/rate/limiter/redis/RedisMasterReplicaRateLimiterFactory.java
@@ -0,0 +1,73 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rate.limiter.redis;
+
+import java.util.List;
+import java.util.Set;
+
+import es.moki.ratelimitj.core.limiter.request.AbstractRequestRateLimiterFactory;
+import es.moki.ratelimitj.core.limiter.request.ReactiveRequestRateLimiter;
+import es.moki.ratelimitj.core.limiter.request.RequestLimitRule;
+import es.moki.ratelimitj.core.limiter.request.RequestRateLimiter;
+import es.moki.ratelimitj.redis.request.RedisSlidingWindowRequestRateLimiter;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisURI;
+import io.lettuce.core.codec.StringCodec;
+import io.lettuce.core.masterreplica.MasterReplica;
+import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection;
+
+public class RedisMasterReplicaRateLimiterFactory extends AbstractRequestRateLimiterFactory<RedisSlidingWindowRequestRateLimiter> {
+    private final RedisClient client;
+    private final List<RedisURI> redisURIs;
+    private StatefulRedisMasterReplicaConnection<String, String> connection;
+
+    public RedisMasterReplicaRateLimiterFactory(RedisClient client, List<RedisURI> redisURIs) {
+        this.client = client;
+        this.redisURIs = redisURIs;
+    }
+
+    @Override
+    protected RedisSlidingWindowRequestRateLimiter create(Set<RequestLimitRule> rules) {
+        return new RedisSlidingWindowRequestRateLimiter(this.getConnection().reactive(), this.getConnection().reactive(), rules);
+    }
+
+    @Override
+    public RequestRateLimiter getInstance(Set<RequestLimitRule> rules) {
+        return this.lookupInstance(rules);
+    }
+
+    @Override
+    public ReactiveRequestRateLimiter getInstanceReactive(Set<RequestLimitRule> rules) {
+        return this.lookupInstance(rules);
+    }
+
+    @Override
+    public void close() {
+        this.client.shutdown();
+    }
+
+    private StatefulRedisMasterReplicaConnection<String, String> getConnection() {
+        if (this.connection == null) {
+            this.connection = MasterReplica.connect(this.client, StringCodec.UTF8, redisURIs);
+        }
+
+        return this.connection;
+    }
+}
diff --git a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
index cbfad1b..9047f91 100644
--- a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
+++ b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
@@ -20,6 +20,7 @@
 package org.apache.james.rate.limiter.redis
 
 import java.time.Duration
+
 import com.google.inject.multibindings.Multibinder
 import com.google.inject.{AbstractModule, Provides, Scopes}
 import es.moki.ratelimitj.core.limiter.request.{AbstractRequestRateLimiterFactory, ReactiveRequestRateLimiter, RequestLimitRule}
@@ -27,8 +28,7 @@
 import io.lettuce.core.RedisClient
 import io.lettuce.core.cluster.RedisClusterClient
 import io.lettuce.core.resource.ClientResources
-import org.apache.james.backends.redis.{RedisConfiguration, RedisHealthCheck}
-
+import org.apache.james.backends.redis.{Cluster, MasterReplica, RedisConfiguration, RedisHealthCheck, RedisTopology, Standalone}
 import jakarta.inject.Inject
 import org.apache.james.core.healthcheck.HealthCheck
 import org.apache.james.rate.limiter.api.Increment.Increment
@@ -58,17 +58,22 @@
 }
 
 class RedisRateLimiterFactory @Inject()(redisConfiguration: RedisConfiguration) extends RateLimiterFactory {
-  val rateLimitjFactory: AbstractRequestRateLimiterFactory[RedisSlidingWindowRequestRateLimiter] =
-    if (redisConfiguration.isCluster) {
-      val resourceBuilder = ClientResources.builder()
-        .threadFactoryProvider(poolName => NamedThreadFactory.withName(s"redis-driver-$poolName"))
-      redisConfiguration.ioThreads.foreach(value => resourceBuilder.ioThreadPoolSize(value))
-      redisConfiguration.workerThreads.foreach(value =>resourceBuilder.computationThreadPoolSize(value))
-      new RedisClusterRateLimiterFactory(RedisClusterClient.create(resourceBuilder.build(),
-        redisConfiguration.redisURI.value.asJava))
-    } else {
-      new RedisSingleInstanceRateLimitjFactory(RedisClient.create(redisConfiguration.redisURI.value.last))
+  val rateLimitjFactory: AbstractRequestRateLimiterFactory[RedisSlidingWindowRequestRateLimiter] = {
+    redisConfiguration.redisTopology match {
+      case Cluster =>
+        val resourceBuilder = ClientResources.builder()
+          .threadFactoryProvider(poolName => NamedThreadFactory.withName(s"redis-driver-$poolName"))
+        redisConfiguration.ioThreads.foreach(value => resourceBuilder.ioThreadPoolSize(value))
+        redisConfiguration.workerThreads.foreach(value =>resourceBuilder.computationThreadPoolSize(value))
+        new RedisClusterRateLimiterFactory(RedisClusterClient.create(resourceBuilder.build(),
+          redisConfiguration.redisURI.value.asJava))
+      case MasterReplica =>
+        new RedisMasterReplicaRateLimiterFactory(RedisClient.create(redisConfiguration.redisURI.value.last), redisConfiguration.redisURI.value.asJava)
+      case Standalone =>
+        new RedisSingleInstanceRateLimitjFactory(RedisClient.create(redisConfiguration.redisURI.value.last))
+      case _ => throw new NotImplementedError()
     }
+  }
 
   override def withSpecification(rules: Rules, precision: Option[Duration]): RateLimiter =
     RedisRateLimiter(rateLimitjFactory.getInstanceReactive(rules.rules
diff --git a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala
index 336915d..79a5fd5 100644
--- a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala
+++ b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala
@@ -19,9 +19,9 @@
 
 package org.apache.james.rate.limiter
 
-import org.apache.james.backends.redis.{DockerRedis, RedisConfiguration, RedisExtension}
-
+import org.apache.james.backends.redis.{DockerRedis, RedisConfiguration, RedisExtension, Standalone}
 import java.time.Duration
+
 import org.apache.james.rate.limiter.api.{RateLimiterContract, RateLimiterFactory}
 import org.apache.james.rate.limiter.redis.RedisRateLimiterFactory
 import org.junit.jupiter.api.BeforeEach
@@ -34,7 +34,7 @@
 
   @BeforeEach
   def setup(redis: DockerRedis): Unit = {
-    redisRateLimiterConfiguration = RedisConfiguration.from(redis.redisURI().toString, false)
+    redisRateLimiterConfiguration = RedisConfiguration.from(redis.redisURI().toString, Standalone)
   }
 
   override def testee(): RateLimiterFactory = new RedisRateLimiterFactory(redisRateLimiterConfiguration)
diff --git a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithMasterReplicaTopologyTest.scala b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithMasterReplicaTopologyTest.scala
new file mode 100644
index 0000000..ee0c33e
--- /dev/null
+++ b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithMasterReplicaTopologyTest.scala
@@ -0,0 +1,59 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                 *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ************************************************************** */
+
+package org.apache.james.rate.limiter
+
+import java.time.Duration
+
+import eu.timepit.refined.auto._
+import org.apache.james.backends.redis.RedisMasterReplicaExtension
+import org.apache.james.rate.limiter.RedisRateLimiterWithMasterReplicaTopologyTest.{RULES, SLIDING_WIDOW_PRECISION}
+import org.apache.james.rate.limiter.api.{AcceptableRate, RateLimitingKey, RateLimitingResult, Rule, Rules}
+import org.apache.james.rate.limiter.redis.RedisRateLimiterFactory
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{BeforeEach, Test}
+import reactor.core.scala.publisher.SMono
+
+object RedisRateLimiterWithMasterReplicaTopologyTest {
+  val SLIDING_WIDOW_PRECISION: Option[Duration] = Some(Duration.ofSeconds(1))
+  val RULES = Rules(Seq(Rule(4L, Duration.ofSeconds(2))))
+}
+
+@ExtendWith(Array(classOf[RedisMasterReplicaExtension]))
+class RedisRateLimiterWithMasterReplicaTopologyTest {
+  var rateLimiterFactory: RedisRateLimiterFactory = _
+
+  @BeforeEach
+  def setup(redisClusterContainer: RedisMasterReplicaExtension.RedisClusterContainer): Unit = {
+    rateLimiterFactory = new RedisRateLimiterFactory(redisClusterContainer.getRedisConfiguration)
+  }
+
+  @Test
+  def test(redisClusterContainer: RedisMasterReplicaExtension.RedisClusterContainer): Unit = {
+    val rateLimiterFactory = new RedisRateLimiterFactory(redisClusterContainer.getRedisConfiguration)
+    val rateLimiter = rateLimiterFactory.withSpecification(RULES, SLIDING_WIDOW_PRECISION)
+    val actual: RateLimitingResult = SMono(rateLimiter.rateLimit(TestKey("key1"), 4)).block()
+    assertThat(actual).isEqualTo(AcceptableRate)
+  }
+}
+
+case class TestKey(value: String) extends RateLimitingKey {
+  override def asString: String = value
+}
diff --git a/upgrade-instructions.md b/upgrade-instructions.md
index 3bcbd25..59feaa8 100644
--- a/upgrade-instructions.md
+++ b/upgrade-instructions.md
@@ -31,6 +31,18 @@
  - [javax -> jakarta](#javax---jakarta)
  - [Make all queues on RabbitMQ quorum queue when `quorum.queues.enable=true`](#make-all-queues-on-rabbitmq-quorum-queue-when-quorumqueuesenabletrue)
 
+### Change cluster.enabled in redis.properties to redis.topology
+
+Date: 05/09/2024
+
+Concerned products: Distributed James, Cassandra James
+
+JIRA: https://issues.apache.org/jira/browse/JAMES-3693
+
+Now James supports more than two topologies (previously there were just cluster and standalone).
+
+Use `redis.topology` property instead.
+
 ### Make all queues on RabbitMQ quorum queue when `quorum.queues.enable=true`
 
 Date: 16/04/2024