[ENHANCEMENT] Reprocess for a specific recipient (#2226)

diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
index 8c74e23..0d5aef8 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
@@ -33,6 +33,7 @@
 import jakarta.mail.internet.MimeMessage;
 
 import org.apache.commons.io.output.CountingOutputStream;
+import org.apache.james.core.MailAddress;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepositoryPath;
 import org.apache.james.mailrepository.api.MailRepositoryStore;
@@ -62,6 +63,7 @@
 import org.apache.james.webadmin.utils.Responses;
 import org.eclipse.jetty.http.HttpStatus;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -331,6 +333,7 @@
         return new ReprocessingService.Configuration(parseTargetQueue(request),
             parseTargetProcessor(request),
             parseMaxRetries(request),
+            parseForRecipient(request),
             parseConsume(request).orElse(true),
             parseLimit(request));
     }
@@ -363,6 +366,11 @@
         return Optional.ofNullable(request.queryParams("processor"));
     }
 
+    private Optional<MailAddress> parseForRecipient(Request request) {
+        return Optional.ofNullable(request.queryParams("forRecipient"))
+            .map(Throwing.function(MailAddress::new));
+    }
+
     private Optional<Boolean> parseConsume(Request request) {
         return Optional.ofNullable(request.queryParams("consume"))
             .map(Boolean::parseBoolean);
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java
index 312996a..dc8f02b 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java
@@ -21,6 +21,7 @@
 import java.time.Instant;
 import java.util.Optional;
 
+import org.apache.james.core.MailAddress;
 import org.apache.james.json.DTOModule;
 import org.apache.james.mailrepository.api.MailRepositoryPath;
 import org.apache.james.queue.api.MailQueueName;
@@ -29,6 +30,7 @@
 import org.apache.james.util.streams.Limit;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
 
 public class ReprocessingAllMailsTaskAdditionalInformationDTO implements AdditionalInformationDTO {
     public static AdditionalInformationDTOModule<ReprocessingAllMailsTask.AdditionalInformation, ReprocessingAllMailsTaskAdditionalInformationDTO> module() {
@@ -40,6 +42,7 @@
                     MailQueueName.of(dto.getTargetQueue()),
                     dto.getTargetProcessor(),
                     dto.getMaxRetries(),
+                    dto.forRecipient.map(Throwing.function(MailAddress::new)),
                     dto.isConsume(),
                     Limit.from(dto.getLimit())),
                 dto.initialCount,
@@ -50,6 +53,7 @@
                 details.getRepositoryPath(),
                 details.getConfiguration().getMailQueueName().asString(),
                 details.getConfiguration().getTargetProcessor(),
+                details.getConfiguration().getForRecipient().map(MailAddress::asString),
                 Optional.of(details.getConfiguration().isConsume()),
                 details.getInitialCount(),
                 details.getRemainingCount(),
@@ -64,6 +68,7 @@
     private final String repositoryPath;
     private final String targetQueue;
     private final Optional<String> targetProcessor;
+    private final Optional<String> forRecipient;
     private final boolean consume;
     private final long initialCount;
     private final long remainingCount;
@@ -76,6 +81,7 @@
         @JsonProperty("repositoryPath") String repositoryPath,
         @JsonProperty("targetQueue") String targetQueue,
         @JsonProperty("targetProcessor") Optional<String> targetProcessor,
+        @JsonProperty("forRecipient") Optional<String> forRecipient,
         @JsonProperty("consume") Optional<Boolean> consume,
         @JsonProperty("initialCount") long initialCount,
         @JsonProperty("remainingCount") long remainingCount,
@@ -86,6 +92,7 @@
         this.repositoryPath = repositoryPath;
         this.targetQueue = targetQueue;
         this.targetProcessor = targetProcessor;
+        this.forRecipient = forRecipient;
         this.initialCount = initialCount;
         this.remainingCount = remainingCount;
         this.timestamp = timestamp;
@@ -131,6 +138,10 @@
         return targetProcessor;
     }
 
+    public Optional<String> getForRecipient() {
+        return forRecipient;
+    }
+
     public Optional<Integer> getLimit() {
         return limit;
     }
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java
index 7784e5c..bdc2066 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java
@@ -20,6 +20,7 @@
 
 import java.util.Optional;
 
+import org.apache.james.core.MailAddress;
 import org.apache.james.json.DTOModule;
 import org.apache.james.mailrepository.api.MailRepositoryPath;
 import org.apache.james.queue.api.MailQueueName;
@@ -28,6 +29,7 @@
 import org.apache.james.util.streams.Limit;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
 
 public class ReprocessingAllMailsTaskDTO implements TaskDTO {
 
@@ -50,6 +52,7 @@
                 domainObject.getConfiguration().getMailQueueName().asString(),
                 Optional.of(domainObject.getConfiguration().isConsume()),
                 domainObject.getConfiguration().getTargetProcessor(),
+                domainObject.getConfiguration().getForRecipient().map(MailAddress::asString),
                 domainObject.getConfiguration().getLimit().getLimit(),
                 domainObject.getConfiguration().getMaxRetries());
         } catch (Exception e) {
@@ -63,6 +66,7 @@
     private final String targetQueue;
     private final boolean consume;
     private final Optional<String> targetProcessor;
+    private final Optional<String> forRecipient;
     private final Optional<Integer> limit;
     private final Optional<Integer> maxRetries;
 
@@ -72,6 +76,7 @@
                                        @JsonProperty("targetQueue") String targetQueue,
                                        @JsonProperty("consume") Optional<Boolean> consume,
                                        @JsonProperty("targetProcessor") Optional<String> targetProcessor,
+                                       @JsonProperty("forRecipient") Optional<String> forRecipient,
                                        @JsonProperty("limit") Optional<Integer> limit,
                                        @JsonProperty("maxRetries") Optional<Integer> maxRetries) {
         this.type = type;
@@ -80,6 +85,7 @@
         this.targetQueue = targetQueue;
         this.consume = consume.orElse(true);
         this.targetProcessor = targetProcessor;
+        this.forRecipient = forRecipient;
         this.limit = limit;
         this.maxRetries = maxRetries;
     }
@@ -94,6 +100,7 @@
                     MailQueueName.of(targetQueue),
                     targetProcessor,
                     maxRetries,
+                    forRecipient.map(Throwing.function(MailAddress::new)),
                     consume,
                     Limit.from(limit)));
         } catch (Exception e) {
@@ -126,6 +133,10 @@
         return consume;
     }
 
+    public Optional<String> getForRecipient() {
+        return forRecipient;
+    }
+
     public Optional<String> getTargetProcessor() {
         return targetProcessor;
     }
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java
index 74d3809..a6c1520 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java
@@ -21,6 +21,7 @@
 import java.time.Instant;
 import java.util.Optional;
 
+import org.apache.james.core.MailAddress;
 import org.apache.james.json.DTOModule;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepositoryPath;
@@ -30,6 +31,7 @@
 import org.apache.james.util.streams.Limit;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
 
 public class ReprocessingOneMailTaskAdditionalInformationDTO implements AdditionalInformationDTO {
 
@@ -44,6 +46,7 @@
                     MailQueueName.of(dto.targetQueue),
                     dto.targetProcessor,
                     NO_MAX_RETRIES,
+                    dto.forRecipient.map(Throwing.function(MailAddress::new)),
                     dto.isConsume(),
                     Limit.unlimited()),
                 new MailKey(dto.mailKey),
@@ -55,6 +58,7 @@
                 details.getMailKey(),
                 Optional.of(details.getConfiguration().isConsume()),
                 details.getConfiguration().getTargetProcessor(),
+                details.getConfiguration().getForRecipient().map(MailAddress::asString),
                 details.timestamp()))
             .typeName(ReprocessingOneMailTask.TYPE.asString())
             .withFactory(AdditionalInformationDTOModule::new);
@@ -65,6 +69,7 @@
     private final String targetQueue;
     private final String mailKey;
     private final Optional<String> targetProcessor;
+    private final Optional<String> forRecipient;
     private final boolean consume;
     private final Instant timestamp;
 
@@ -74,6 +79,7 @@
                                                            @JsonProperty("mailKey") String mailKey,
                                                            @JsonProperty("consume") Optional<Boolean> consume,
                                                            @JsonProperty("targetProcessor") Optional<String> targetProcessor,
+                                                           @JsonProperty("forRecipient") Optional<String> forRecipient,
                                                            @JsonProperty("timestamp") Instant timestamp) {
         this.type = type;
         this.consume = consume.orElse(true);
@@ -81,6 +87,7 @@
         this.targetQueue = targetQueue;
         this.mailKey = mailKey;
         this.targetProcessor = targetProcessor;
+        this.forRecipient = forRecipient;
         this.timestamp = timestamp;
     }
 
@@ -109,6 +116,10 @@
         return timestamp;
     }
 
+    public Optional<String> getForRecipient() {
+        return forRecipient;
+    }
+
     public Optional<String> getTargetProcessor() {
         return targetProcessor;
     }
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java
index af0016f..eac8a86 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java
@@ -21,6 +21,7 @@
 import java.time.Clock;
 import java.util.Optional;
 
+import org.apache.james.core.MailAddress;
 import org.apache.james.json.DTOModule;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepositoryPath;
@@ -30,6 +31,7 @@
 import org.apache.james.util.streams.Limit;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
 
 public class ReprocessingOneMailTaskDTO implements TaskDTO {
 
@@ -53,6 +55,7 @@
                 domainObject.getConfiguration().getMailQueueName().asString(),
                 domainObject.getMailKey().asString(),
                 domainObject.getConfiguration().getTargetProcessor(),
+                domainObject.getConfiguration().getForRecipient().map(MailAddress::asString),
                 Optional.of(domainObject.getConfiguration().isConsume()));
         } catch (Exception e) {
             throw new ReprocessingOneMailTask.UrlEncodingFailureSerializationException(domainObject.getRepositoryPath());
@@ -64,6 +67,7 @@
     private final String targetQueue;
     private final String mailKey;
     private final Optional<String> targetProcessor;
+    private final Optional<String> forRecipient;
     private final boolean consume;
 
     public ReprocessingOneMailTaskDTO(@JsonProperty("type") String type,
@@ -71,12 +75,14 @@
                                       @JsonProperty("targetQueue") String targetQueue,
                                       @JsonProperty("mailKey") String mailKey,
                                       @JsonProperty("targetProcessor") Optional<String> targetProcessor,
+                                      @JsonProperty("forRecipient") Optional<String> forRecipient,
                                       @JsonProperty("boolean") Optional<Boolean> consume) {
         this.type = type;
         this.repositoryPath = repositoryPath;
         this.mailKey = mailKey;
         this.targetQueue = targetQueue;
         this.targetProcessor = targetProcessor;
+        this.forRecipient = forRecipient;
         this.consume = consume.orElse(true);
     }
 
@@ -88,6 +94,7 @@
                 MailQueueName.of(targetQueue),
                 targetProcessor,
                 NO_MAX_RETRIES,
+                forRecipient.map(Throwing.function(MailAddress::new)),
                 consume,
                 Limit.unlimited()),
             new MailKey(mailKey),
@@ -123,6 +130,10 @@
         return consume;
     }
 
+    public Optional<String> getForRecipient() {
+        return forRecipient;
+    }
+
     public Optional<String> getTargetProcessor() {
         return targetProcessor;
     }
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
index c1f0441..730c0f8 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
@@ -31,6 +31,7 @@
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
+import org.apache.james.core.MailAddress;
 import org.apache.james.lifecycle.api.LifecycleUtil;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepository;
@@ -69,17 +70,27 @@
         private final MailQueueName mailQueueName;
         private final Optional<String> targetProcessor;
         private final Optional<Integer> maxRetries;
+        private final Optional<MailAddress> forRecipient;
         private final boolean consume;
         private final Limit limit;
 
-        public Configuration(MailQueueName mailQueueName, Optional<String> targetProcessor, Optional<Integer> maxRetries, boolean consume, Limit limit) {
+        public Configuration(MailQueueName mailQueueName, Optional<String> targetProcessor, Optional<Integer> maxRetries, Optional<MailAddress> forRecipient, boolean consume, Limit limit) {
             this.mailQueueName = mailQueueName;
             this.targetProcessor = targetProcessor;
             this.maxRetries = maxRetries;
+            this.forRecipient = forRecipient;
             this.consume = consume;
             this.limit = limit;
         }
 
+        public Configuration(MailQueueName mailQueueName, Optional<String> targetProcessor, Optional<Integer> maxRetries, boolean consume, Limit limit) {
+            this(mailQueueName, targetProcessor, maxRetries, Optional.empty(), consume, limit);
+        }
+
+        public Optional<MailAddress> getForRecipient() {
+            return forRecipient;
+        }
+
         public MailQueueName getMailQueueName() {
             return mailQueueName;
         }
@@ -178,11 +189,16 @@
                 .doOnNext(keyListener)
                 .flatMap(mailKey -> Mono.fromCallable(() -> repository.retrieve(mailKey))
                     .map(mail -> Triple.of(mail, repository, mailKey)))
-                .filter(triple -> !reprocessor.retryExceeded(triple.getLeft())))))
+                .filter(triple -> !reprocessor.retryExceeded(triple.getLeft()))))
+                .filter(triple -> filterRecipients(configuration, triple)))
             .flatMap(triple -> reprocess(triple.getRight(), triple.getLeft(), triple.getMiddle(), reprocessor))
             .reduce(Task.Result.COMPLETED, Task::combine);
     }
 
+    private static Boolean filterRecipients(Configuration configuration, Triple<Mail, MailRepository, MailKey> triple) {
+        return configuration.forRecipient.map(rcpt -> triple.getLeft().getRecipients().contains(rcpt)).orElse(true);
+    }
+
     private Mono<Task.Result> reprocess(MailKey key, Mail mail, MailRepository repository, Reprocessor reprocessor) {
         return Mono.fromRunnable(() -> reprocessor.reprocess(repository, mail, key))
             .thenReturn(Task.Result.COMPLETED)
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
index 94e2cff..85a46e4 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
@@ -1141,6 +1141,39 @@
     }
 
     @Test
+    void reprocessingAllTaskShouldAllowFilteringByRecipient() throws Exception {
+        MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO);
+        String recipient1 = "recipient1@domain";
+        String recipient2 = "recipient2@domain";
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .recipient(recipient1)
+            .mimeMessage(MimeMessageUtil.mimeMessageFromBytes(MESSAGE_BYTES))
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .recipient(recipient2)
+            .mimeMessage(MimeMessageUtil.mimeMessageFromBytes(MESSAGE_BYTES))
+            .build());
+
+        String taskId = with()
+            .param("action", "reprocess")
+            .param("forRecipient", recipient1)
+            .patch(PATH_ESCAPED_MY_REPO + "/mails")
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await");
+
+        assertThat(mailRepository.list())
+            .toIterable()
+            .containsOnly(new MailKey(NAME_2));
+    }
+
+    @Test
     void reprocessingAllTaskShouldIncludeDetails() throws Exception {
         MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO);
         String name1 = "name1";