[ENHANCEMENT] Better reactify Identity methods
diff --git a/server/data/data-api/src/main/java/org/apache/james/rrt/api/AliasReverseResolver.java b/server/data/data-api/src/main/java/org/apache/james/rrt/api/AliasReverseResolver.java
index 05117c5..0967209 100644
--- a/server/data/data-api/src/main/java/org/apache/james/rrt/api/AliasReverseResolver.java
+++ b/server/data/data-api/src/main/java/org/apache/james/rrt/api/AliasReverseResolver.java
@@ -19,11 +19,10 @@
 
 package org.apache.james.rrt.api;
 
-import java.util.stream.Stream;
-
 import org.apache.james.core.MailAddress;
 import org.apache.james.core.Username;
+import org.reactivestreams.Publisher;
 
 public interface AliasReverseResolver {
-    Stream<MailAddress> listAddresses(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException;
+    Publisher<MailAddress> listAddresses(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException;
 }
diff --git a/server/data/data-api/src/main/java/org/apache/james/rrt/api/CanSendFrom.java b/server/data/data-api/src/main/java/org/apache/james/rrt/api/CanSendFrom.java
index 9fee906..582376e 100644
--- a/server/data/data-api/src/main/java/org/apache/james/rrt/api/CanSendFrom.java
+++ b/server/data/data-api/src/main/java/org/apache/james/rrt/api/CanSendFrom.java
@@ -18,8 +18,6 @@
  ****************************************************************/
 package org.apache.james.rrt.api;
 
-import java.util.stream.Stream;
-
 import org.apache.james.core.MailAddress;
 import org.apache.james.core.Username;
 import org.reactivestreams.Publisher;
@@ -36,6 +34,6 @@
     /**
      * For a given user, return all the addresses he can use in the from clause of an email.
      */
-    Stream<MailAddress> allValidFromAddressesForUser(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException;
+    Publisher<MailAddress> allValidFromAddressesForUser(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException;
 
 }
diff --git a/server/data/data-api/src/main/java/org/apache/james/rrt/api/RecipientRewriteTable.java b/server/data/data-api/src/main/java/org/apache/james/rrt/api/RecipientRewriteTable.java
index e7506b2..01078df 100644
--- a/server/data/data-api/src/main/java/org/apache/james/rrt/api/RecipientRewriteTable.java
+++ b/server/data/data-api/src/main/java/org/apache/james/rrt/api/RecipientRewriteTable.java
@@ -31,6 +31,8 @@
 
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
+
 /**
  * Interface which should be implemented of classes which map recipients.
  */
@@ -138,6 +140,14 @@
             .map(Map.Entry::getKey);
     }
 
+    default Flux<MappingSource> listSourcesReactive(Mapping mapping) {
+        try {
+            return Flux.fromStream(listSources(mapping));
+        } catch (RecipientRewriteTableException e) {
+            return Flux.error(e);
+        }
+    }
+
     default Stream<MappingSource> getSourcesForType(Mapping.Type type) throws RecipientRewriteTableException {
         return getAllMappings()
             .entrySet().stream()
diff --git a/server/data/data-api/src/test/java/org/apache/james/rrt/lib/AliasReverseResolverContract.java b/server/data/data-api/src/test/java/org/apache/james/rrt/lib/AliasReverseResolverContract.java
index 891fe46..3504256 100644
--- a/server/data/data-api/src/test/java/org/apache/james/rrt/lib/AliasReverseResolverContract.java
+++ b/server/data/data-api/src/test/java/org/apache/james/rrt/lib/AliasReverseResolverContract.java
@@ -30,6 +30,8 @@
 
 import com.github.fge.lambdas.Throwing;
 
+import reactor.core.publisher.Flux;
+
 public interface AliasReverseResolverContract {
 
     Domain DOMAIN = Domain.of("example.com");
@@ -66,14 +68,14 @@
 
     @Test
     default void listAddressesShouldContainOnlyUserAddressWhenUserHasNoAlias() throws Exception {
-        assertThat(aliasReverseResolver().listAddresses(USER))
+        assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
             .containsExactly(USER.asMailAddress());
     }
 
     @Test
     default void listAddressesShouldContainOnlyUserAddressWhenUserHasNoAliasAndAnotherUserHasOne() throws Exception {
         redirectUser(USER_ALIAS).to(OTHER_USER);
-        assertThat(aliasReverseResolver().listAddresses(USER))
+        assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
             .containsExactly(USER.asMailAddress());
     }
 
@@ -81,7 +83,7 @@
     default void listAddressesShouldContainUserAddressAndAnAliasOfTheUser() throws Exception {
         redirectUser(USER_ALIAS).to(USER);
 
-        assertThat(aliasReverseResolver().listAddresses(USER))
+        assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
             .containsExactlyInAnyOrder(USER.asMailAddress(), USER_ALIAS.asMailAddress());
     }
 
@@ -91,7 +93,7 @@
         redirectUser(userAliasBis).to(USER_ALIAS);
         redirectUser(USER_ALIAS).to(USER);
 
-        assertThat(aliasReverseResolver().listAddresses(USER))
+        assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
             .containsExactlyInAnyOrder(USER.asMailAddress(), USER_ALIAS.asMailAddress(), userAliasBis.asMailAddress());
     }
 
@@ -101,7 +103,7 @@
 
         redirectDomain(OTHER_DOMAIN).to(DOMAIN);
 
-        assertThat(aliasReverseResolver().listAddresses(USER))
+        assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
             .containsExactlyInAnyOrder(USER.asMailAddress(), fromUser.asMailAddress());
     }
 
@@ -114,7 +116,7 @@
 
         Username userAliasMainDomain = USER_ALIAS.withOtherDomain(DOMAIN);
         Username userOtherDomain = USER.withOtherDomain(OTHER_DOMAIN);
-        assertThat(aliasReverseResolver().listAddresses(USER))
+        assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
             .containsExactlyInAnyOrder(USER.asMailAddress(), userAliasOtherDomain.asMailAddress(), userAliasMainDomain.asMailAddress(), userOtherDomain.asMailAddress());
     }
 
@@ -136,7 +138,7 @@
         Username userAliasExcluded = Username.of("alias" + (recursionLevel - 1) + "@" + DOMAIN.asString());
         redirectUser(USER_ALIAS).to(USER);
 
-        assertThat(aliasReverseResolver().listAddresses(USER))
+        assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
             .doesNotContain(userAliasExcluded.asMailAddress());
     }
 
@@ -147,7 +149,7 @@
         redirectUser(userAliasBis).to(userAlias);
         redirectUser(userAlias).to(USER);
 
-        assertThat(aliasReverseResolver().listAddresses(USER))
+        assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
             .contains(userAliasBis.asMailAddress());
     }
 
@@ -157,7 +159,7 @@
         redirectUser(userAliasBis).to(USER_ALIAS);
         redirectUser(USER_ALIAS).to(USER);
 
-        assertThat(aliasReverseResolver().listAddresses(USER))
+        assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
             .contains(userAliasBis.asMailAddress());
     }
 }
diff --git a/server/data/data-api/src/test/java/org/apache/james/rrt/lib/CanSendFromContract.java b/server/data/data-api/src/test/java/org/apache/james/rrt/lib/CanSendFromContract.java
index 3262131..1127efd 100644
--- a/server/data/data-api/src/test/java/org/apache/james/rrt/lib/CanSendFromContract.java
+++ b/server/data/data-api/src/test/java/org/apache/james/rrt/lib/CanSendFromContract.java
@@ -29,6 +29,8 @@
 
 import com.github.fge.lambdas.Throwing;
 
+import reactor.core.publisher.Flux;
+
 public interface CanSendFromContract {
 
     Domain DOMAIN = Domain.of("example.com");
@@ -173,14 +175,14 @@
 
     @Test
     default void allValidFromAddressesShouldContainOnlyUserAddressWhenUserHasNoAlias() throws Exception {
-        assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+        assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
             .containsExactly(USER.asMailAddress());
     }
 
     @Test
     default void allValidFromAddressesShouldContainOnlyUserAddressWhenUserHasNoAliasAndAnotherUserHasOne() throws Exception {
         redirectUser(USER_ALIAS).to(OTHER_USER);
-        assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+        assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
             .containsExactly(USER.asMailAddress());
     }
 
@@ -188,7 +190,7 @@
     default void allValidFromAddressesShouldContainUserAddressAndAnAliasOfTheUser() throws Exception {
         redirectUser(USER_ALIAS).to(USER);
 
-        assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+        assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
             .containsExactlyInAnyOrder(USER.asMailAddress(), USER_ALIAS.asMailAddress());
     }
 
@@ -198,7 +200,7 @@
         redirectUser(userAliasBis).to(USER_ALIAS);
         redirectUser(USER_ALIAS).to(USER);
 
-        assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+        assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
             .containsExactlyInAnyOrder(USER.asMailAddress(), USER_ALIAS.asMailAddress(), userAliasBis.asMailAddress());
     }
 
@@ -208,7 +210,7 @@
 
         redirectDomain(OTHER_DOMAIN).to(DOMAIN).asAlias();
 
-        assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+        assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
             .containsExactlyInAnyOrder(USER.asMailAddress(), fromUser.asMailAddress());
     }
 
@@ -221,7 +223,7 @@
 
         Username userAliasMainDomain = USER_ALIAS.withOtherDomain(DOMAIN);
         Username userOtherDomain = USER.withOtherDomain(OTHER_DOMAIN);
-        assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+        assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
             .containsExactlyInAnyOrder(USER.asMailAddress(), userAliasOtherDomain.asMailAddress(), userAliasMainDomain.asMailAddress(), userOtherDomain.asMailAddress());
     }
 
@@ -252,7 +254,7 @@
         Username userAliasExcluded = Username.of("alias" + (recursionLevel - 1) + "@" + DOMAIN.asString());
         redirectUser(USER_ALIAS).to(USER);
 
-        assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+        assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
             .doesNotContain(userAliasExcluded.asMailAddress());
     }
 
@@ -263,7 +265,7 @@
         redirectUser(userAliasBis).to(userAlias);
         redirectUser(userAlias).to(USER);
 
-        assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+        assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
             .contains(userAliasBis.asMailAddress());
     }
 
@@ -273,7 +275,7 @@
         redirectUser(userAliasBis).to(USER_ALIAS);
         redirectUser(USER_ALIAS).to(USER);
 
-        assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+        assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
             .contains(userAliasBis.asMailAddress());
     }
 }
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
index deacb92..6eda5a3 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
@@ -35,6 +35,8 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
+import reactor.core.publisher.Flux;
+
 public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTable {
     private final CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
     private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
@@ -91,4 +93,11 @@
 
         return cassandraMappingsSourcesDAO.retrieveSources(mapping).toStream();
     }
+
+    @Override
+    public Flux<MappingSource> listSourcesReactive(Mapping mapping) {
+        Preconditions.checkArgument(listSourcesSupportedType.contains(mapping.getType()),
+            "Not supported mapping of type %s", mapping.getType());
+        return cassandraMappingsSourcesDAO.retrieveSources(mapping);
+    }
 }
diff --git a/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala
index a2ee0d5..f746c2c 100644
--- a/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala
+++ b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala
@@ -26,11 +26,9 @@
 import org.apache.james.jmap.api.model.{EmailAddress, ForbiddenSendFromException, HtmlSignature, Identity, IdentityId, IdentityName, MayDeleteIdentity, TextSignature}
 import org.apache.james.rrt.api.CanSendFrom
 import org.apache.james.user.api.UsersRepository
-import org.apache.james.util.ReactorUtils
 import org.reactivestreams.Publisher
 import reactor.core.scala.publisher.{SFlux, SMono}
 
-import scala.jdk.StreamConverters._
 import scala.util.Try
 import scala.jdk.OptionConverters._
 import scala.jdk.CollectionConverters._
@@ -171,9 +169,8 @@
 }
 
 class DefaultIdentitySupplier @Inject()(canSendFrom: CanSendFrom, usersRepository: UsersRepository) {
-  def listIdentities(username: Username): List[Identity] = canSendFrom.allValidFromAddressesForUser(username)
-    .toScala(LazyList).toList
-    .flatMap(address =>
+  def listIdentities(username: Username): Publisher[Identity] = SFlux(canSendFrom.allValidFromAddressesForUser(username))
+    .map(address =>
       from(address).map(id =>
         Identity(
           id = id,
@@ -185,14 +182,12 @@
           htmlSignature = HtmlSignature.DEFAULT,
           mayDelete = MayDeleteIdentity(false),
           sortOrder = Identity.DEFAULT_SORTORDER)))
+    .flatMap(option => option.map(SMono.just).getOrElse(SMono.empty))
 
   def userCanSendFrom(username: Username, mailAddress: MailAddress): SMono[Boolean] =
     SMono.fromPublisher(canSendFrom.userCanSendFromReactive(username, usersRepository.getUsername(mailAddress)))
       .map(boolean2Boolean(_))
 
-  def isServerSetIdentity(username: Username, id: IdentityId): Boolean =
-    listIdentities(username).map(_.id).contains(id)
-
   private def from(address: MailAddress): Option[IdentityId] =
     Try(UUID.nameUUIDFromBytes(address.asString().getBytes(StandardCharsets.UTF_8)))
       .toEither
@@ -221,8 +216,8 @@
       .map(_.identity)
 
   private def listServerSetIdentity(user: Username): SMono[(Set[MailAddress], List[Identity])] =
-    SMono.fromCallable(() => identityFactory.listIdentities(user))
-      .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+    SFlux(identityFactory.listIdentities(user))
+      .collectSeq().map(s => s.toList)
       .map(list => (list.map(_.email).toSet, list))
 
   private def listCustomIdentity(user: Username, availableMailAddresses: Set[MailAddress]): SFlux[Identity] =
@@ -230,9 +225,9 @@
       .filter(identity => availableMailAddresses.contains(identity.email))
 
   def update(user: Username, identityId: IdentityId, identityUpdateRequest: IdentityUpdateRequest): Publisher[Unit] = {
-    val findServerSetIdentity: SMono[Option[Identity]] = SMono.fromCallable(() => identityFactory.listIdentities(user)
-      .find(identity => identity.id.equals(identityId)))
-      .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+    val findServerSetIdentity: SMono[Option[Identity]] = SFlux(identityFactory.listIdentities(user))
+      .collectSeq().map(s => s.toList)
+      .map(list => list.find(identity => identity.id.equals(identityId)))
     val findCustomIdentity: SMono[Option[Identity]] = SMono(customIdentityDao.findByIdentityId(user, identityId))
       .map(Some(_))
       .switchIfEmpty(SMono.just(None))
@@ -252,17 +247,20 @@
       .`then`()
   }
 
-  def delete(username: Username, ids: Set[IdentityId]): Publisher[Unit] =
-    SMono.just(ids)
-      .handle[Set[IdentityId]]{
-        case (ids, sink) => if (identityFactory.isServerSetIdentity(username, ids.head)) {
-          sink.error(IdentityForbiddenDeleteException(ids.head))
+  def delete(username: Username, ids: Set[IdentityId]): Publisher[Unit] = {
+    SFlux(identityFactory.listIdentities(username))
+      .map(_.id)
+      .collectSeq()
+      .flatMapMany(serverSetIdentities => SFlux.fromIterable(ids)
+        .handle[IdentityId] {
+        case (id, sink) => if (serverSetIdentities.contains(id)) {
+          sink.error(IdentityForbiddenDeleteException(id))
         } else {
-          sink.next(ids)
+          sink.next(id)
         }
-      }
-      .flatMap(ids => SMono.fromPublisher(customIdentityDao.delete(username, ids)))
-      .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+      }).collectSeq()
+      .flatMap(ids => SMono.fromPublisher(customIdentityDao.delete(username, ids.toSet)))
+  }
 }
 
 case class IdentityNotFoundException(id: IdentityId) extends RuntimeException(s"$id could not be found")
diff --git a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/IdentityRepositoryTest.scala b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/IdentityRepositoryTest.scala
index a5d28c2..1bc9785 100644
--- a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/IdentityRepositoryTest.scala
+++ b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/IdentityRepositoryTest.scala
@@ -99,7 +99,7 @@
   @Test
   def listShouldReturnCustomAndServerSetEntries(): Unit = {
     val customIdentity: Identity = SMono.fromPublisher(customIdentityDAO.save(BOB, CREATION_REQUEST)).block()
-    when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+    when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
 
     assertThat(SFlux.fromPublisher(testee.list(BOB)).collectSeq().block().asJava)
       .containsExactlyInAnyOrder(IDENTITY1, customIdentity)
@@ -109,7 +109,7 @@
   def listShouldReturnCustomEntryWhenIdExistsInBothCustomAndServerSet(): Unit = {
     val customIdentity: Identity = SMono.fromPublisher(customIdentityDAO.save(BOB, CREATION_REQUEST)).block()
     val differentIdentityWithSameId: Identity = customIdentity.copy(name = IdentityName("different name"))
-    when(identityFactory.listIdentities(BOB)).thenReturn(List(differentIdentityWithSameId))
+    when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(differentIdentityWithSameId))
 
     assertThat(SFlux.fromPublisher(testee.list(BOB)).collectSeq().block().asJava)
       .containsExactlyInAnyOrder(customIdentity.copy(mayDelete = MayDeleteIdentity(false)))
@@ -118,7 +118,7 @@
   @Test
   def listShouldNotReturnTheIdentityHasAliasNotExists(): Unit = {
     SMono(customIdentityDAO.save(BOB, CREATION_REQUEST)).block()
-    when(identityFactory.listIdentities(BOB)).thenReturn(List())
+    when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.empty)
     assertThat(SFlux(testee.list(BOB)).collectSeq().block().asJava)
       .isEmpty()
   }
@@ -127,7 +127,7 @@
   def listShouldWorkWhenMixCases(): Unit = {
     val customIdentity: Identity = SMono(customIdentityDAO.save(BOB, CREATION_REQUEST)).block()
     val customIdentityHasEmailNotExist: Identity = SMono(customIdentityDAO.save(BOB, CREATION_REQUEST.copy(email = new MailAddress("bob2@domain.tld")))).block()
-    when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+    when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
 
     assertThat(SFlux(testee.list(BOB)).collectSeq().block().asJava)
       .containsExactlyInAnyOrder(customIdentity, IDENTITY1)
@@ -136,7 +136,7 @@
 
   @Test
   def updateShouldFailWhenIdNotFoundInBothCustomAndServerSetDAO(): Unit = {
-    when(identityFactory.listIdentities(BOB)).thenReturn(List())
+    when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.empty)
 
     assertThatThrownBy(() => SMono.fromPublisher(testee.update(BOB, IdentityId.generate, UPDATE_REQUEST)).block())
       .isInstanceOf(classOf[IdentityNotFoundException])
@@ -144,7 +144,7 @@
 
   @Test
   def updateShouldSuccessWhenCustomNotFoundAndServerSetExists(): Unit = {
-    when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+    when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
 
     assertThatCode(() => SMono.fromPublisher(testee.update(BOB, IDENTITY1.id, UPDATE_REQUEST)).block())
       .doesNotThrowAnyException()
@@ -152,7 +152,7 @@
 
   @Test
   def updateShouldSuccessWhenCustomExistsAndAliasExists(): Unit = {
-    when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+    when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
     when(identityFactory.userCanSendFrom(BOB, BOB.asMailAddress())).thenReturn(SMono.just(true))
     val customIdentity: Identity = SMono(customIdentityDAO.save(BOB, CREATION_REQUEST)).block()
 
@@ -162,7 +162,7 @@
 
   @Test
   def updateShouldModifiedEntry(): Unit = {
-    when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+    when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
     when(identityFactory.userCanSendFrom(BOB, BOB.asMailAddress())).thenReturn(SMono.just(true))
 
     val customIdentity: Identity = SMono(customIdentityDAO.save(BOB, CREATION_REQUEST)).block()
@@ -176,7 +176,7 @@
 
   @Test
   def updateShouldSuccessWhenMultiUpdateServerSetId(): Unit = {
-    when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+    when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
     when(identityFactory.userCanSendFrom(BOB, BOB.asMailAddress())).thenReturn(SMono.just(true))
     SMono(testee.update(BOB, IDENTITY1.id, UPDATE_REQUEST)).block()
 
@@ -196,7 +196,7 @@
 
   @Test
   def updateShouldSuccessWhenSecondPartialUpdateServerSetId(): Unit = {
-    when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+    when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
     when(identityFactory.userCanSendFrom(BOB, BOB.asMailAddress())).thenReturn(SMono.just(true))
 
     SMono(testee.update(BOB, IDENTITY1.id, UPDATE_REQUEST)).block()
@@ -223,7 +223,7 @@
     val identity: Identity = SMono.fromPublisher(testee.save(BOB, CREATION_REQUEST)).block()
 
     when(identityFactory.userCanSendFrom(BOB, BOB.asMailAddress())).thenReturn(SMono.just(false))
-    when(identityFactory.listIdentities(BOB)).thenReturn(List())
+    when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.empty)
 
     assertThatThrownBy(() => SMono.fromPublisher(testee.update(BOB, identity.id, UPDATE_REQUEST)).block())
       .isInstanceOf(classOf[IdentityNotFoundException])
diff --git a/server/data/data-library/src/main/java/org/apache/james/rrt/lib/AliasReverseResolverImpl.java b/server/data/data-library/src/main/java/org/apache/james/rrt/lib/AliasReverseResolverImpl.java
index 273ac3e..6829097 100644
--- a/server/data/data-library/src/main/java/org/apache/james/rrt/lib/AliasReverseResolverImpl.java
+++ b/server/data/data-library/src/main/java/org/apache/james/rrt/lib/AliasReverseResolverImpl.java
@@ -19,25 +19,25 @@
 
 package org.apache.james.rrt.lib;
 
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Stream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 import jakarta.inject.Inject;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.core.Domain;
 import org.apache.james.core.MailAddress;
 import org.apache.james.core.Username;
 import org.apache.james.rrt.api.AliasReverseResolver;
 import org.apache.james.rrt.api.RecipientRewriteTable;
-import org.apache.james.rrt.api.RecipientRewriteTableException;
-import org.apache.james.util.StreamUtils;
+import org.apache.james.util.ReactorUtils;
 
 import com.github.fge.lambdas.Throwing;
-import com.google.common.collect.ImmutableList;
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Flux;
 
 public class AliasReverseResolverImpl implements AliasReverseResolver {
     private final RecipientRewriteTable recipientRewriteTable;
@@ -48,58 +48,63 @@
     }
 
     @Override
-    public Stream<MailAddress> listAddresses(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException {
-        CanSendFromImpl.DomainFetcher domains = domainFetcher(user);
+    public Flux<MailAddress> listAddresses(Username user) {
+        CanSendFromImpl.DomainFetcher domains = domainFetcher();
 
         return relatedAliases(user)
-            .flatMap(allowedUser -> domains.fetch(allowedUser)
-                .stream()
-                .map(Optional::of)
-                .map(allowedUser::withOtherDomain)
-                .map(Throwing.function(Username::asMailAddress).sneakyThrow()))
+            .flatMap(allowedUser -> user.getDomainPart()
+                .map(domain -> Flux.concat(
+                    Flux.just(allowedUser),
+                    domains.fetch(domain).map(allowedUser::withOtherDomain)))
+                .orElseGet(() -> Flux.just(allowedUser)))
+            .map(Throwing.function(Username::asMailAddress).sneakyThrow())
             .distinct();
     }
 
-    private Stream<Username> relatedAliases(Username user) {
-        return StreamUtils.iterate(
-            user,
-            getMappingLimit(),
-            Throwing.<Username, Stream<Username>>function(targetUser ->
-                recipientRewriteTable
-                    .listSources(Mapping.alias(targetUser.asString()))
+    private Flux<Username> relatedAliases(Username user) {
+        Pair<Username, Integer> userWithDepth = Pair.of(user, 0);
+        return Flux.just(userWithDepth)
+            .expand(value -> {
+                if (value.getRight() >= getMappingLimit()) {
+                    return Flux.empty();
+                }
+                return recipientRewriteTable.listSourcesReactive(Mapping.alias(value.getLeft().asString()))
                     .map(MappingSource::asUsername)
-                    .flatMap(Optional::stream)).sneakyThrow()
-        );
+                    .handle(ReactorUtils.publishIfPresent())
+                    .map(u -> Pair.of(u, value.getRight() + 1));
+            }).map(Pair::getLeft);
     }
 
-    private CanSendFromImpl.DomainFetcher domainFetcher(Username user) {
-        HashMap<Domain, List<Domain>> fetchedDomains = new HashMap<>();
-        List<Domain> userDomains = relatedDomains(user).collect(ImmutableList.toImmutableList());
-        user.getDomainPart().ifPresent(domain -> fetchedDomains.put(domain, userDomains));
-        Function<Domain, List<Domain>> computeDomain = givenDomain -> Stream.concat(userDomains.stream(), fetchDomains(givenDomain)).collect(ImmutableList.toImmutableList());
-        return givenUsername ->
-            givenUsername
-                .getDomainPart()
-                .map(domain -> fetchedDomains.computeIfAbsent(domain, computeDomain))
-                .orElseGet(Arrays::asList);
+    private CanSendFromImpl.DomainFetcher domainFetcher() {
+        return new CanSendFromImpl.DomainFetcher() {
+            private final Map<Domain, List<Domain>> memoized = new ConcurrentHashMap<>();
+            @Override
+            public Flux<Domain> fetch(Domain domain) {
+                if (memoized.containsKey(domain)) {
+                    return Flux.fromIterable(memoized.get(domain));
+                }
+                return fetchDomains(domain)
+                    .collect(Collectors.toList())
+                    .doOnNext(next -> memoized.put(domain, next))
+                    .flatMapIterable(i -> i);
+            }
+        };
     }
 
-    private Stream<Domain> relatedDomains(Username user) {
-        return user.getDomainPart()
-            .map(this::fetchDomains)
-            .orElseGet(Stream::empty);
+    private Flux<Domain> fetchDomains(Domain domain) {
+        Pair<Domain, Integer> domainWithDepth = Pair.of(domain, 0);
+        Flux<Pair<Domain, Integer>> flux = Flux.just(domainWithDepth);
+        return expandDomains(flux);
     }
 
-    private Stream<Domain> fetchDomains(Domain domain) {
-        return StreamUtils.iterate(
-            domain,
-            getMappingLimit(),
-            Throwing.<Domain, Stream<Domain>>function(targetDomain ->
-                recipientRewriteTable
-                    .listSources(Mapping.domainAlias(targetDomain))
+    private Flux<Domain> expandDomains(Flux<Pair<Domain, Integer>> flux) {
+        return flux.expand(value -> {
+                Preconditions.checkArgument(value.getRight() < getMappingLimit());
+                return recipientRewriteTable.listSourcesReactive(Mapping.domainAlias(value.getKey()))
                     .map(MappingSource::asDomain)
-                    .flatMap(Optional::stream)).sneakyThrow()
-        );
+                    .handle(ReactorUtils.publishIfPresent())
+                    .map(u -> Pair.of(u, value.getRight() + 1));
+            }).map(Pair::getLeft);
     }
 
     private long getMappingLimit() {
diff --git a/server/data/data-library/src/main/java/org/apache/james/rrt/lib/CanSendFromImpl.java b/server/data/data-library/src/main/java/org/apache/james/rrt/lib/CanSendFromImpl.java
index 4407bb4..5e29e04 100644
--- a/server/data/data-library/src/main/java/org/apache/james/rrt/lib/CanSendFromImpl.java
+++ b/server/data/data-library/src/main/java/org/apache/james/rrt/lib/CanSendFromImpl.java
@@ -22,9 +22,7 @@
 import static org.apache.james.rrt.lib.Mapping.Type.DomainAlias;
 
 import java.util.EnumSet;
-import java.util.List;
 import java.util.Optional;
-import java.util.stream.Stream;
 
 import jakarta.inject.Inject;
 
@@ -40,13 +38,14 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CanSendFromImpl implements CanSendFrom {
 
     @FunctionalInterface
     interface DomainFetcher {
-        List<Domain> fetch(Username user);
+        Flux<Domain> fetch(Domain user);
     }
 
     private static final Logger LOGGER = LoggerFactory.getLogger(CanSendFromImpl.class);
@@ -80,8 +79,8 @@
     }
 
     @Override
-    public Stream<MailAddress> allValidFromAddressesForUser(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException {
-        return aliasReverseResolver.listAddresses(user);
+    public Flux<MailAddress> allValidFromAddressesForUser(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException {
+        return Flux.from(aliasReverseResolver.listAddresses(user));
     }
 
     private boolean emailIsAnAliasOfTheConnectedUser(Username connectedUser, Username fromUser) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException {
diff --git a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java
index a06efcd..500e541 100644
--- a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java
+++ b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java
@@ -374,10 +374,10 @@
                     .haltError();
             }
 
-            return canSendFrom
-                .allValidFromAddressesForUser(username)
+            return Flux.from(canSendFrom.allValidFromAddressesForUser(username))
                 .map(MailAddress::asString)
-                .collect(ImmutableList.toImmutableList());
+                .collect(ImmutableList.toImmutableList())
+                .block();
         } catch (RecipientRewriteTable.ErrorMappingException | RecipientRewriteTableException | UsersRepositoryException e) {
             String errorMessage = String.format("Error while listing allowed From headers for user '%s'", username);
             LOGGER.info(errorMessage, e);
diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/UserIdentitiesRoutesTest.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/UserIdentitiesRoutesTest.java
index e9a7a04..d07729d 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/UserIdentitiesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/UserIdentitiesRoutesTest.java
@@ -57,7 +57,6 @@
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scala.publisher.SMono;
-import scala.jdk.javaapi.CollectionConverters;
 
 class UserIdentitiesRoutesTest {
 
@@ -95,7 +94,7 @@
     void listIdentitiesShouldReturnBothCustomAndServerSetIdentities() throws Exception {
         // identity: server set
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+            .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
 
         IdentityCreationRequest creationRequest = IdentityCreationRequest.fromJava(
             BOB.asMailAddress(),
@@ -181,7 +180,7 @@
     void listIdentitiesShouldSupportDefaultParam() throws Exception {
         // identity: server set
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+            .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
 
         Integer highPriorityOrder = 1;
         Integer lowPriorityOrder = 2;
@@ -248,7 +247,7 @@
     @Test
     void listIdentitiesShouldReturnBadRequestWhenInvalidDefaultParam() {
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+            .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
 
         String response = given()
             .queryParam("default", "invalid")
@@ -272,7 +271,7 @@
     @Test
     void listIdentitiesShouldReturnNotFoundWhenCanNotQueryDefaultIdentity() {
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.<Identity>of()).toList());
+            .thenReturn(Flux.empty());
 
         String response = given()
             .queryParam("default", "true")
@@ -318,7 +317,7 @@
             "";
 
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+            .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
 
         given()
             .body(creationRequest)
@@ -373,7 +372,7 @@
             "";
 
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+            .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
 
         given()
             .body(creationRequest)
@@ -416,7 +415,7 @@
             "    }";
 
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+            .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
 
         String response = given()
             .body(creationRequest)
@@ -438,7 +437,7 @@
     @Test
     void createIdentityShouldFailWhenInvalidRequest() {
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+            .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
 
         String response = given()
             .body("invalid")
@@ -468,7 +467,7 @@
             "";
 
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+            .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
 
         given()
             .body(creationRequest)
@@ -503,7 +502,7 @@
     @Test
     void updateIdentityShouldWork() throws Exception {
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+            .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
 
         IdentityCreationRequest creationRequest = IdentityCreationRequest.fromJava(
             BOB.asMailAddress(),
@@ -582,7 +581,7 @@
     @Test
     void updateIdentityShouldFailWhenIdNotFound() {
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+            .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
 
         String updateRequest = "" +
             "    {" +
@@ -628,7 +627,7 @@
     @Test
     void updateIdentityShouldNotModifyAbsentPropertyInRequest() throws Exception {
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+            .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
 
         IdentityCreationRequest creationRequest = IdentityCreationRequest.fromJava(
             BOB.asMailAddress(),
@@ -695,7 +694,7 @@
     @Test
     void updateIdentityShouldNotAcceptChangeMayDeleteProperty() throws Exception {
         Mockito.when(identityFactory.listIdentities(BOB))
-            .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+            .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
 
         IdentityCreationRequest creationRequest = IdentityCreationRequest.fromJava(
             BOB.asMailAddress(),