[ENHANCEMENT] Better reactify Identity methods
diff --git a/server/data/data-api/src/main/java/org/apache/james/rrt/api/AliasReverseResolver.java b/server/data/data-api/src/main/java/org/apache/james/rrt/api/AliasReverseResolver.java
index 05117c5..0967209 100644
--- a/server/data/data-api/src/main/java/org/apache/james/rrt/api/AliasReverseResolver.java
+++ b/server/data/data-api/src/main/java/org/apache/james/rrt/api/AliasReverseResolver.java
@@ -19,11 +19,10 @@
package org.apache.james.rrt.api;
-import java.util.stream.Stream;
-
import org.apache.james.core.MailAddress;
import org.apache.james.core.Username;
+import org.reactivestreams.Publisher;
public interface AliasReverseResolver {
- Stream<MailAddress> listAddresses(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException;
+ Publisher<MailAddress> listAddresses(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException;
}
diff --git a/server/data/data-api/src/main/java/org/apache/james/rrt/api/CanSendFrom.java b/server/data/data-api/src/main/java/org/apache/james/rrt/api/CanSendFrom.java
index 9fee906..582376e 100644
--- a/server/data/data-api/src/main/java/org/apache/james/rrt/api/CanSendFrom.java
+++ b/server/data/data-api/src/main/java/org/apache/james/rrt/api/CanSendFrom.java
@@ -18,8 +18,6 @@
****************************************************************/
package org.apache.james.rrt.api;
-import java.util.stream.Stream;
-
import org.apache.james.core.MailAddress;
import org.apache.james.core.Username;
import org.reactivestreams.Publisher;
@@ -36,6 +34,6 @@
/**
* For a given user, return all the addresses he can use in the from clause of an email.
*/
- Stream<MailAddress> allValidFromAddressesForUser(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException;
+ Publisher<MailAddress> allValidFromAddressesForUser(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException;
}
diff --git a/server/data/data-api/src/main/java/org/apache/james/rrt/api/RecipientRewriteTable.java b/server/data/data-api/src/main/java/org/apache/james/rrt/api/RecipientRewriteTable.java
index e7506b2..01078df 100644
--- a/server/data/data-api/src/main/java/org/apache/james/rrt/api/RecipientRewriteTable.java
+++ b/server/data/data-api/src/main/java/org/apache/james/rrt/api/RecipientRewriteTable.java
@@ -31,6 +31,8 @@
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Flux;
+
/**
* Interface which should be implemented of classes which map recipients.
*/
@@ -138,6 +140,14 @@
.map(Map.Entry::getKey);
}
+ default Flux<MappingSource> listSourcesReactive(Mapping mapping) {
+ try {
+ return Flux.fromStream(listSources(mapping));
+ } catch (RecipientRewriteTableException e) {
+ return Flux.error(e);
+ }
+ }
+
default Stream<MappingSource> getSourcesForType(Mapping.Type type) throws RecipientRewriteTableException {
return getAllMappings()
.entrySet().stream()
diff --git a/server/data/data-api/src/test/java/org/apache/james/rrt/lib/AliasReverseResolverContract.java b/server/data/data-api/src/test/java/org/apache/james/rrt/lib/AliasReverseResolverContract.java
index 891fe46..3504256 100644
--- a/server/data/data-api/src/test/java/org/apache/james/rrt/lib/AliasReverseResolverContract.java
+++ b/server/data/data-api/src/test/java/org/apache/james/rrt/lib/AliasReverseResolverContract.java
@@ -30,6 +30,8 @@
import com.github.fge.lambdas.Throwing;
+import reactor.core.publisher.Flux;
+
public interface AliasReverseResolverContract {
Domain DOMAIN = Domain.of("example.com");
@@ -66,14 +68,14 @@
@Test
default void listAddressesShouldContainOnlyUserAddressWhenUserHasNoAlias() throws Exception {
- assertThat(aliasReverseResolver().listAddresses(USER))
+ assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
.containsExactly(USER.asMailAddress());
}
@Test
default void listAddressesShouldContainOnlyUserAddressWhenUserHasNoAliasAndAnotherUserHasOne() throws Exception {
redirectUser(USER_ALIAS).to(OTHER_USER);
- assertThat(aliasReverseResolver().listAddresses(USER))
+ assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
.containsExactly(USER.asMailAddress());
}
@@ -81,7 +83,7 @@
default void listAddressesShouldContainUserAddressAndAnAliasOfTheUser() throws Exception {
redirectUser(USER_ALIAS).to(USER);
- assertThat(aliasReverseResolver().listAddresses(USER))
+ assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
.containsExactlyInAnyOrder(USER.asMailAddress(), USER_ALIAS.asMailAddress());
}
@@ -91,7 +93,7 @@
redirectUser(userAliasBis).to(USER_ALIAS);
redirectUser(USER_ALIAS).to(USER);
- assertThat(aliasReverseResolver().listAddresses(USER))
+ assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
.containsExactlyInAnyOrder(USER.asMailAddress(), USER_ALIAS.asMailAddress(), userAliasBis.asMailAddress());
}
@@ -101,7 +103,7 @@
redirectDomain(OTHER_DOMAIN).to(DOMAIN);
- assertThat(aliasReverseResolver().listAddresses(USER))
+ assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
.containsExactlyInAnyOrder(USER.asMailAddress(), fromUser.asMailAddress());
}
@@ -114,7 +116,7 @@
Username userAliasMainDomain = USER_ALIAS.withOtherDomain(DOMAIN);
Username userOtherDomain = USER.withOtherDomain(OTHER_DOMAIN);
- assertThat(aliasReverseResolver().listAddresses(USER))
+ assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
.containsExactlyInAnyOrder(USER.asMailAddress(), userAliasOtherDomain.asMailAddress(), userAliasMainDomain.asMailAddress(), userOtherDomain.asMailAddress());
}
@@ -136,7 +138,7 @@
Username userAliasExcluded = Username.of("alias" + (recursionLevel - 1) + "@" + DOMAIN.asString());
redirectUser(USER_ALIAS).to(USER);
- assertThat(aliasReverseResolver().listAddresses(USER))
+ assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
.doesNotContain(userAliasExcluded.asMailAddress());
}
@@ -147,7 +149,7 @@
redirectUser(userAliasBis).to(userAlias);
redirectUser(userAlias).to(USER);
- assertThat(aliasReverseResolver().listAddresses(USER))
+ assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
.contains(userAliasBis.asMailAddress());
}
@@ -157,7 +159,7 @@
redirectUser(userAliasBis).to(USER_ALIAS);
redirectUser(USER_ALIAS).to(USER);
- assertThat(aliasReverseResolver().listAddresses(USER))
+ assertThat(Flux.from(aliasReverseResolver().listAddresses(USER)).toStream())
.contains(userAliasBis.asMailAddress());
}
}
diff --git a/server/data/data-api/src/test/java/org/apache/james/rrt/lib/CanSendFromContract.java b/server/data/data-api/src/test/java/org/apache/james/rrt/lib/CanSendFromContract.java
index 3262131..1127efd 100644
--- a/server/data/data-api/src/test/java/org/apache/james/rrt/lib/CanSendFromContract.java
+++ b/server/data/data-api/src/test/java/org/apache/james/rrt/lib/CanSendFromContract.java
@@ -29,6 +29,8 @@
import com.github.fge.lambdas.Throwing;
+import reactor.core.publisher.Flux;
+
public interface CanSendFromContract {
Domain DOMAIN = Domain.of("example.com");
@@ -173,14 +175,14 @@
@Test
default void allValidFromAddressesShouldContainOnlyUserAddressWhenUserHasNoAlias() throws Exception {
- assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+ assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
.containsExactly(USER.asMailAddress());
}
@Test
default void allValidFromAddressesShouldContainOnlyUserAddressWhenUserHasNoAliasAndAnotherUserHasOne() throws Exception {
redirectUser(USER_ALIAS).to(OTHER_USER);
- assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+ assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
.containsExactly(USER.asMailAddress());
}
@@ -188,7 +190,7 @@
default void allValidFromAddressesShouldContainUserAddressAndAnAliasOfTheUser() throws Exception {
redirectUser(USER_ALIAS).to(USER);
- assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+ assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
.containsExactlyInAnyOrder(USER.asMailAddress(), USER_ALIAS.asMailAddress());
}
@@ -198,7 +200,7 @@
redirectUser(userAliasBis).to(USER_ALIAS);
redirectUser(USER_ALIAS).to(USER);
- assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+ assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
.containsExactlyInAnyOrder(USER.asMailAddress(), USER_ALIAS.asMailAddress(), userAliasBis.asMailAddress());
}
@@ -208,7 +210,7 @@
redirectDomain(OTHER_DOMAIN).to(DOMAIN).asAlias();
- assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+ assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
.containsExactlyInAnyOrder(USER.asMailAddress(), fromUser.asMailAddress());
}
@@ -221,7 +223,7 @@
Username userAliasMainDomain = USER_ALIAS.withOtherDomain(DOMAIN);
Username userOtherDomain = USER.withOtherDomain(OTHER_DOMAIN);
- assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+ assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
.containsExactlyInAnyOrder(USER.asMailAddress(), userAliasOtherDomain.asMailAddress(), userAliasMainDomain.asMailAddress(), userOtherDomain.asMailAddress());
}
@@ -252,7 +254,7 @@
Username userAliasExcluded = Username.of("alias" + (recursionLevel - 1) + "@" + DOMAIN.asString());
redirectUser(USER_ALIAS).to(USER);
- assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+ assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
.doesNotContain(userAliasExcluded.asMailAddress());
}
@@ -263,7 +265,7 @@
redirectUser(userAliasBis).to(userAlias);
redirectUser(userAlias).to(USER);
- assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+ assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
.contains(userAliasBis.asMailAddress());
}
@@ -273,7 +275,7 @@
redirectUser(userAliasBis).to(USER_ALIAS);
redirectUser(USER_ALIAS).to(USER);
- assertThat(canSendFrom().allValidFromAddressesForUser(USER))
+ assertThat(Flux.from(canSendFrom().allValidFromAddressesForUser(USER)).toStream())
.contains(userAliasBis.asMailAddress());
}
}
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
index deacb92..6eda5a3 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
@@ -35,6 +35,8 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+import reactor.core.publisher.Flux;
+
public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTable {
private final CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
@@ -91,4 +93,11 @@
return cassandraMappingsSourcesDAO.retrieveSources(mapping).toStream();
}
+
+ @Override
+ public Flux<MappingSource> listSourcesReactive(Mapping mapping) {
+ Preconditions.checkArgument(listSourcesSupportedType.contains(mapping.getType()),
+ "Not supported mapping of type %s", mapping.getType());
+ return cassandraMappingsSourcesDAO.retrieveSources(mapping);
+ }
}
diff --git a/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala
index a2ee0d5..f746c2c 100644
--- a/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala
+++ b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala
@@ -26,11 +26,9 @@
import org.apache.james.jmap.api.model.{EmailAddress, ForbiddenSendFromException, HtmlSignature, Identity, IdentityId, IdentityName, MayDeleteIdentity, TextSignature}
import org.apache.james.rrt.api.CanSendFrom
import org.apache.james.user.api.UsersRepository
-import org.apache.james.util.ReactorUtils
import org.reactivestreams.Publisher
import reactor.core.scala.publisher.{SFlux, SMono}
-import scala.jdk.StreamConverters._
import scala.util.Try
import scala.jdk.OptionConverters._
import scala.jdk.CollectionConverters._
@@ -171,9 +169,8 @@
}
class DefaultIdentitySupplier @Inject()(canSendFrom: CanSendFrom, usersRepository: UsersRepository) {
- def listIdentities(username: Username): List[Identity] = canSendFrom.allValidFromAddressesForUser(username)
- .toScala(LazyList).toList
- .flatMap(address =>
+ def listIdentities(username: Username): Publisher[Identity] = SFlux(canSendFrom.allValidFromAddressesForUser(username))
+ .map(address =>
from(address).map(id =>
Identity(
id = id,
@@ -185,14 +182,12 @@
htmlSignature = HtmlSignature.DEFAULT,
mayDelete = MayDeleteIdentity(false),
sortOrder = Identity.DEFAULT_SORTORDER)))
+ .flatMap(option => option.map(SMono.just).getOrElse(SMono.empty))
def userCanSendFrom(username: Username, mailAddress: MailAddress): SMono[Boolean] =
SMono.fromPublisher(canSendFrom.userCanSendFromReactive(username, usersRepository.getUsername(mailAddress)))
.map(boolean2Boolean(_))
- def isServerSetIdentity(username: Username, id: IdentityId): Boolean =
- listIdentities(username).map(_.id).contains(id)
-
private def from(address: MailAddress): Option[IdentityId] =
Try(UUID.nameUUIDFromBytes(address.asString().getBytes(StandardCharsets.UTF_8)))
.toEither
@@ -221,8 +216,8 @@
.map(_.identity)
private def listServerSetIdentity(user: Username): SMono[(Set[MailAddress], List[Identity])] =
- SMono.fromCallable(() => identityFactory.listIdentities(user))
- .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+ SFlux(identityFactory.listIdentities(user))
+ .collectSeq().map(s => s.toList)
.map(list => (list.map(_.email).toSet, list))
private def listCustomIdentity(user: Username, availableMailAddresses: Set[MailAddress]): SFlux[Identity] =
@@ -230,9 +225,9 @@
.filter(identity => availableMailAddresses.contains(identity.email))
def update(user: Username, identityId: IdentityId, identityUpdateRequest: IdentityUpdateRequest): Publisher[Unit] = {
- val findServerSetIdentity: SMono[Option[Identity]] = SMono.fromCallable(() => identityFactory.listIdentities(user)
- .find(identity => identity.id.equals(identityId)))
- .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+ val findServerSetIdentity: SMono[Option[Identity]] = SFlux(identityFactory.listIdentities(user))
+ .collectSeq().map(s => s.toList)
+ .map(list => list.find(identity => identity.id.equals(identityId)))
val findCustomIdentity: SMono[Option[Identity]] = SMono(customIdentityDao.findByIdentityId(user, identityId))
.map(Some(_))
.switchIfEmpty(SMono.just(None))
@@ -252,17 +247,20 @@
.`then`()
}
- def delete(username: Username, ids: Set[IdentityId]): Publisher[Unit] =
- SMono.just(ids)
- .handle[Set[IdentityId]]{
- case (ids, sink) => if (identityFactory.isServerSetIdentity(username, ids.head)) {
- sink.error(IdentityForbiddenDeleteException(ids.head))
+ def delete(username: Username, ids: Set[IdentityId]): Publisher[Unit] = {
+ SFlux(identityFactory.listIdentities(username))
+ .map(_.id)
+ .collectSeq()
+ .flatMapMany(serverSetIdentities => SFlux.fromIterable(ids)
+ .handle[IdentityId] {
+ case (id, sink) => if (serverSetIdentities.contains(id)) {
+ sink.error(IdentityForbiddenDeleteException(id))
} else {
- sink.next(ids)
+ sink.next(id)
}
- }
- .flatMap(ids => SMono.fromPublisher(customIdentityDao.delete(username, ids)))
- .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+ }).collectSeq()
+ .flatMap(ids => SMono.fromPublisher(customIdentityDao.delete(username, ids.toSet)))
+ }
}
case class IdentityNotFoundException(id: IdentityId) extends RuntimeException(s"$id could not be found")
diff --git a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/IdentityRepositoryTest.scala b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/IdentityRepositoryTest.scala
index a5d28c2..1bc9785 100644
--- a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/IdentityRepositoryTest.scala
+++ b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/IdentityRepositoryTest.scala
@@ -99,7 +99,7 @@
@Test
def listShouldReturnCustomAndServerSetEntries(): Unit = {
val customIdentity: Identity = SMono.fromPublisher(customIdentityDAO.save(BOB, CREATION_REQUEST)).block()
- when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+ when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
assertThat(SFlux.fromPublisher(testee.list(BOB)).collectSeq().block().asJava)
.containsExactlyInAnyOrder(IDENTITY1, customIdentity)
@@ -109,7 +109,7 @@
def listShouldReturnCustomEntryWhenIdExistsInBothCustomAndServerSet(): Unit = {
val customIdentity: Identity = SMono.fromPublisher(customIdentityDAO.save(BOB, CREATION_REQUEST)).block()
val differentIdentityWithSameId: Identity = customIdentity.copy(name = IdentityName("different name"))
- when(identityFactory.listIdentities(BOB)).thenReturn(List(differentIdentityWithSameId))
+ when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(differentIdentityWithSameId))
assertThat(SFlux.fromPublisher(testee.list(BOB)).collectSeq().block().asJava)
.containsExactlyInAnyOrder(customIdentity.copy(mayDelete = MayDeleteIdentity(false)))
@@ -118,7 +118,7 @@
@Test
def listShouldNotReturnTheIdentityHasAliasNotExists(): Unit = {
SMono(customIdentityDAO.save(BOB, CREATION_REQUEST)).block()
- when(identityFactory.listIdentities(BOB)).thenReturn(List())
+ when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.empty)
assertThat(SFlux(testee.list(BOB)).collectSeq().block().asJava)
.isEmpty()
}
@@ -127,7 +127,7 @@
def listShouldWorkWhenMixCases(): Unit = {
val customIdentity: Identity = SMono(customIdentityDAO.save(BOB, CREATION_REQUEST)).block()
val customIdentityHasEmailNotExist: Identity = SMono(customIdentityDAO.save(BOB, CREATION_REQUEST.copy(email = new MailAddress("bob2@domain.tld")))).block()
- when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+ when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
assertThat(SFlux(testee.list(BOB)).collectSeq().block().asJava)
.containsExactlyInAnyOrder(customIdentity, IDENTITY1)
@@ -136,7 +136,7 @@
@Test
def updateShouldFailWhenIdNotFoundInBothCustomAndServerSetDAO(): Unit = {
- when(identityFactory.listIdentities(BOB)).thenReturn(List())
+ when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.empty)
assertThatThrownBy(() => SMono.fromPublisher(testee.update(BOB, IdentityId.generate, UPDATE_REQUEST)).block())
.isInstanceOf(classOf[IdentityNotFoundException])
@@ -144,7 +144,7 @@
@Test
def updateShouldSuccessWhenCustomNotFoundAndServerSetExists(): Unit = {
- when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+ when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
assertThatCode(() => SMono.fromPublisher(testee.update(BOB, IDENTITY1.id, UPDATE_REQUEST)).block())
.doesNotThrowAnyException()
@@ -152,7 +152,7 @@
@Test
def updateShouldSuccessWhenCustomExistsAndAliasExists(): Unit = {
- when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+ when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
when(identityFactory.userCanSendFrom(BOB, BOB.asMailAddress())).thenReturn(SMono.just(true))
val customIdentity: Identity = SMono(customIdentityDAO.save(BOB, CREATION_REQUEST)).block()
@@ -162,7 +162,7 @@
@Test
def updateShouldModifiedEntry(): Unit = {
- when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+ when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
when(identityFactory.userCanSendFrom(BOB, BOB.asMailAddress())).thenReturn(SMono.just(true))
val customIdentity: Identity = SMono(customIdentityDAO.save(BOB, CREATION_REQUEST)).block()
@@ -176,7 +176,7 @@
@Test
def updateShouldSuccessWhenMultiUpdateServerSetId(): Unit = {
- when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+ when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
when(identityFactory.userCanSendFrom(BOB, BOB.asMailAddress())).thenReturn(SMono.just(true))
SMono(testee.update(BOB, IDENTITY1.id, UPDATE_REQUEST)).block()
@@ -196,7 +196,7 @@
@Test
def updateShouldSuccessWhenSecondPartialUpdateServerSetId(): Unit = {
- when(identityFactory.listIdentities(BOB)).thenReturn(List(IDENTITY1))
+ when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.just(IDENTITY1))
when(identityFactory.userCanSendFrom(BOB, BOB.asMailAddress())).thenReturn(SMono.just(true))
SMono(testee.update(BOB, IDENTITY1.id, UPDATE_REQUEST)).block()
@@ -223,7 +223,7 @@
val identity: Identity = SMono.fromPublisher(testee.save(BOB, CREATION_REQUEST)).block()
when(identityFactory.userCanSendFrom(BOB, BOB.asMailAddress())).thenReturn(SMono.just(false))
- when(identityFactory.listIdentities(BOB)).thenReturn(List())
+ when(identityFactory.listIdentities(BOB)).thenReturn(SFlux.empty)
assertThatThrownBy(() => SMono.fromPublisher(testee.update(BOB, identity.id, UPDATE_REQUEST)).block())
.isInstanceOf(classOf[IdentityNotFoundException])
diff --git a/server/data/data-library/src/main/java/org/apache/james/rrt/lib/AliasReverseResolverImpl.java b/server/data/data-library/src/main/java/org/apache/james/rrt/lib/AliasReverseResolverImpl.java
index 273ac3e..6829097 100644
--- a/server/data/data-library/src/main/java/org/apache/james/rrt/lib/AliasReverseResolverImpl.java
+++ b/server/data/data-library/src/main/java/org/apache/james/rrt/lib/AliasReverseResolverImpl.java
@@ -19,25 +19,25 @@
package org.apache.james.rrt.lib;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Stream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import jakarta.inject.Inject;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.core.Domain;
import org.apache.james.core.MailAddress;
import org.apache.james.core.Username;
import org.apache.james.rrt.api.AliasReverseResolver;
import org.apache.james.rrt.api.RecipientRewriteTable;
-import org.apache.james.rrt.api.RecipientRewriteTableException;
-import org.apache.james.util.StreamUtils;
+import org.apache.james.util.ReactorUtils;
import com.github.fge.lambdas.Throwing;
-import com.google.common.collect.ImmutableList;
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Flux;
public class AliasReverseResolverImpl implements AliasReverseResolver {
private final RecipientRewriteTable recipientRewriteTable;
@@ -48,58 +48,63 @@
}
@Override
- public Stream<MailAddress> listAddresses(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException {
- CanSendFromImpl.DomainFetcher domains = domainFetcher(user);
+ public Flux<MailAddress> listAddresses(Username user) {
+ CanSendFromImpl.DomainFetcher domains = domainFetcher();
return relatedAliases(user)
- .flatMap(allowedUser -> domains.fetch(allowedUser)
- .stream()
- .map(Optional::of)
- .map(allowedUser::withOtherDomain)
- .map(Throwing.function(Username::asMailAddress).sneakyThrow()))
+ .flatMap(allowedUser -> user.getDomainPart()
+ .map(domain -> Flux.concat(
+ Flux.just(allowedUser),
+ domains.fetch(domain).map(allowedUser::withOtherDomain)))
+ .orElseGet(() -> Flux.just(allowedUser)))
+ .map(Throwing.function(Username::asMailAddress).sneakyThrow())
.distinct();
}
- private Stream<Username> relatedAliases(Username user) {
- return StreamUtils.iterate(
- user,
- getMappingLimit(),
- Throwing.<Username, Stream<Username>>function(targetUser ->
- recipientRewriteTable
- .listSources(Mapping.alias(targetUser.asString()))
+ private Flux<Username> relatedAliases(Username user) {
+ Pair<Username, Integer> userWithDepth = Pair.of(user, 0);
+ return Flux.just(userWithDepth)
+ .expand(value -> {
+ if (value.getRight() >= getMappingLimit()) {
+ return Flux.empty();
+ }
+ return recipientRewriteTable.listSourcesReactive(Mapping.alias(value.getLeft().asString()))
.map(MappingSource::asUsername)
- .flatMap(Optional::stream)).sneakyThrow()
- );
+ .handle(ReactorUtils.publishIfPresent())
+ .map(u -> Pair.of(u, value.getRight() + 1));
+ }).map(Pair::getLeft);
}
- private CanSendFromImpl.DomainFetcher domainFetcher(Username user) {
- HashMap<Domain, List<Domain>> fetchedDomains = new HashMap<>();
- List<Domain> userDomains = relatedDomains(user).collect(ImmutableList.toImmutableList());
- user.getDomainPart().ifPresent(domain -> fetchedDomains.put(domain, userDomains));
- Function<Domain, List<Domain>> computeDomain = givenDomain -> Stream.concat(userDomains.stream(), fetchDomains(givenDomain)).collect(ImmutableList.toImmutableList());
- return givenUsername ->
- givenUsername
- .getDomainPart()
- .map(domain -> fetchedDomains.computeIfAbsent(domain, computeDomain))
- .orElseGet(Arrays::asList);
+ private CanSendFromImpl.DomainFetcher domainFetcher() {
+ return new CanSendFromImpl.DomainFetcher() {
+ private final Map<Domain, List<Domain>> memoized = new ConcurrentHashMap<>();
+ @Override
+ public Flux<Domain> fetch(Domain domain) {
+ if (memoized.containsKey(domain)) {
+ return Flux.fromIterable(memoized.get(domain));
+ }
+ return fetchDomains(domain)
+ .collect(Collectors.toList())
+ .doOnNext(next -> memoized.put(domain, next))
+ .flatMapIterable(i -> i);
+ }
+ };
}
- private Stream<Domain> relatedDomains(Username user) {
- return user.getDomainPart()
- .map(this::fetchDomains)
- .orElseGet(Stream::empty);
+ private Flux<Domain> fetchDomains(Domain domain) {
+ Pair<Domain, Integer> domainWithDepth = Pair.of(domain, 0);
+ Flux<Pair<Domain, Integer>> flux = Flux.just(domainWithDepth);
+ return expandDomains(flux);
}
- private Stream<Domain> fetchDomains(Domain domain) {
- return StreamUtils.iterate(
- domain,
- getMappingLimit(),
- Throwing.<Domain, Stream<Domain>>function(targetDomain ->
- recipientRewriteTable
- .listSources(Mapping.domainAlias(targetDomain))
+ private Flux<Domain> expandDomains(Flux<Pair<Domain, Integer>> flux) {
+ return flux.expand(value -> {
+ Preconditions.checkArgument(value.getRight() < getMappingLimit());
+ return recipientRewriteTable.listSourcesReactive(Mapping.domainAlias(value.getKey()))
.map(MappingSource::asDomain)
- .flatMap(Optional::stream)).sneakyThrow()
- );
+ .handle(ReactorUtils.publishIfPresent())
+ .map(u -> Pair.of(u, value.getRight() + 1));
+ }).map(Pair::getLeft);
}
private long getMappingLimit() {
diff --git a/server/data/data-library/src/main/java/org/apache/james/rrt/lib/CanSendFromImpl.java b/server/data/data-library/src/main/java/org/apache/james/rrt/lib/CanSendFromImpl.java
index 4407bb4..5e29e04 100644
--- a/server/data/data-library/src/main/java/org/apache/james/rrt/lib/CanSendFromImpl.java
+++ b/server/data/data-library/src/main/java/org/apache/james/rrt/lib/CanSendFromImpl.java
@@ -22,9 +22,7 @@
import static org.apache.james.rrt.lib.Mapping.Type.DomainAlias;
import java.util.EnumSet;
-import java.util.List;
import java.util.Optional;
-import java.util.stream.Stream;
import jakarta.inject.Inject;
@@ -40,13 +38,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class CanSendFromImpl implements CanSendFrom {
@FunctionalInterface
interface DomainFetcher {
- List<Domain> fetch(Username user);
+ Flux<Domain> fetch(Domain user);
}
private static final Logger LOGGER = LoggerFactory.getLogger(CanSendFromImpl.class);
@@ -80,8 +79,8 @@
}
@Override
- public Stream<MailAddress> allValidFromAddressesForUser(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException {
- return aliasReverseResolver.listAddresses(user);
+ public Flux<MailAddress> allValidFromAddressesForUser(Username user) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException {
+ return Flux.from(aliasReverseResolver.listAddresses(user));
}
private boolean emailIsAnAliasOfTheConnectedUser(Username connectedUser, Username fromUser) throws RecipientRewriteTable.ErrorMappingException, RecipientRewriteTableException {
diff --git a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java
index a06efcd..500e541 100644
--- a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java
+++ b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java
@@ -374,10 +374,10 @@
.haltError();
}
- return canSendFrom
- .allValidFromAddressesForUser(username)
+ return Flux.from(canSendFrom.allValidFromAddressesForUser(username))
.map(MailAddress::asString)
- .collect(ImmutableList.toImmutableList());
+ .collect(ImmutableList.toImmutableList())
+ .block();
} catch (RecipientRewriteTable.ErrorMappingException | RecipientRewriteTableException | UsersRepositoryException e) {
String errorMessage = String.format("Error while listing allowed From headers for user '%s'", username);
LOGGER.info(errorMessage, e);
diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/UserIdentitiesRoutesTest.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/UserIdentitiesRoutesTest.java
index e9a7a04..d07729d 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/UserIdentitiesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/UserIdentitiesRoutesTest.java
@@ -57,7 +57,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scala.publisher.SMono;
-import scala.jdk.javaapi.CollectionConverters;
class UserIdentitiesRoutesTest {
@@ -95,7 +94,7 @@
void listIdentitiesShouldReturnBothCustomAndServerSetIdentities() throws Exception {
// identity: server set
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+ .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
IdentityCreationRequest creationRequest = IdentityCreationRequest.fromJava(
BOB.asMailAddress(),
@@ -181,7 +180,7 @@
void listIdentitiesShouldSupportDefaultParam() throws Exception {
// identity: server set
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+ .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
Integer highPriorityOrder = 1;
Integer lowPriorityOrder = 2;
@@ -248,7 +247,7 @@
@Test
void listIdentitiesShouldReturnBadRequestWhenInvalidDefaultParam() {
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+ .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
String response = given()
.queryParam("default", "invalid")
@@ -272,7 +271,7 @@
@Test
void listIdentitiesShouldReturnNotFoundWhenCanNotQueryDefaultIdentity() {
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.<Identity>of()).toList());
+ .thenReturn(Flux.empty());
String response = given()
.queryParam("default", "true")
@@ -318,7 +317,7 @@
"";
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+ .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
given()
.body(creationRequest)
@@ -373,7 +372,7 @@
"";
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+ .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
given()
.body(creationRequest)
@@ -416,7 +415,7 @@
" }";
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+ .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
String response = given()
.body(creationRequest)
@@ -438,7 +437,7 @@
@Test
void createIdentityShouldFailWhenInvalidRequest() {
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+ .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
String response = given()
.body("invalid")
@@ -468,7 +467,7 @@
"";
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+ .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
given()
.body(creationRequest)
@@ -503,7 +502,7 @@
@Test
void updateIdentityShouldWork() throws Exception {
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+ .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
IdentityCreationRequest creationRequest = IdentityCreationRequest.fromJava(
BOB.asMailAddress(),
@@ -582,7 +581,7 @@
@Test
void updateIdentityShouldFailWhenIdNotFound() {
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+ .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
String updateRequest = "" +
" {" +
@@ -628,7 +627,7 @@
@Test
void updateIdentityShouldNotModifyAbsentPropertyInRequest() throws Exception {
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+ .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
IdentityCreationRequest creationRequest = IdentityCreationRequest.fromJava(
BOB.asMailAddress(),
@@ -695,7 +694,7 @@
@Test
void updateIdentityShouldNotAcceptChangeMayDeleteProperty() throws Exception {
Mockito.when(identityFactory.listIdentities(BOB))
- .thenReturn(CollectionConverters.asScala(List.of(IdentityRepositoryTest.IDENTITY1())).toList());
+ .thenReturn(Flux.just(IdentityRepositoryTest.IDENTITY1()));
IdentityCreationRequest creationRequest = IdentityCreationRequest.fromJava(
BOB.asMailAddress(),