[ENHANCEMENT] Reprocess for a specific recipient (#2226)
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
index 8c74e23..0d5aef8 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
@@ -33,6 +33,7 @@
import jakarta.mail.internet.MimeMessage;
import org.apache.commons.io.output.CountingOutputStream;
+import org.apache.james.core.MailAddress;
import org.apache.james.mailrepository.api.MailKey;
import org.apache.james.mailrepository.api.MailRepositoryPath;
import org.apache.james.mailrepository.api.MailRepositoryStore;
@@ -62,6 +63,7 @@
import org.apache.james.webadmin.utils.Responses;
import org.eclipse.jetty.http.HttpStatus;
+import com.github.fge.lambdas.Throwing;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -331,6 +333,7 @@
return new ReprocessingService.Configuration(parseTargetQueue(request),
parseTargetProcessor(request),
parseMaxRetries(request),
+ parseForRecipient(request),
parseConsume(request).orElse(true),
parseLimit(request));
}
@@ -363,6 +366,11 @@
return Optional.ofNullable(request.queryParams("processor"));
}
+ private Optional<MailAddress> parseForRecipient(Request request) {
+ return Optional.ofNullable(request.queryParams("forRecipient"))
+ .map(Throwing.function(MailAddress::new));
+ }
+
private Optional<Boolean> parseConsume(Request request) {
return Optional.ofNullable(request.queryParams("consume"))
.map(Boolean::parseBoolean);
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java
index 312996a..dc8f02b 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java
@@ -21,6 +21,7 @@
import java.time.Instant;
import java.util.Optional;
+import org.apache.james.core.MailAddress;
import org.apache.james.json.DTOModule;
import org.apache.james.mailrepository.api.MailRepositoryPath;
import org.apache.james.queue.api.MailQueueName;
@@ -29,6 +30,7 @@
import org.apache.james.util.streams.Limit;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
public class ReprocessingAllMailsTaskAdditionalInformationDTO implements AdditionalInformationDTO {
public static AdditionalInformationDTOModule<ReprocessingAllMailsTask.AdditionalInformation, ReprocessingAllMailsTaskAdditionalInformationDTO> module() {
@@ -40,6 +42,7 @@
MailQueueName.of(dto.getTargetQueue()),
dto.getTargetProcessor(),
dto.getMaxRetries(),
+ dto.forRecipient.map(Throwing.function(MailAddress::new)),
dto.isConsume(),
Limit.from(dto.getLimit())),
dto.initialCount,
@@ -50,6 +53,7 @@
details.getRepositoryPath(),
details.getConfiguration().getMailQueueName().asString(),
details.getConfiguration().getTargetProcessor(),
+ details.getConfiguration().getForRecipient().map(MailAddress::asString),
Optional.of(details.getConfiguration().isConsume()),
details.getInitialCount(),
details.getRemainingCount(),
@@ -64,6 +68,7 @@
private final String repositoryPath;
private final String targetQueue;
private final Optional<String> targetProcessor;
+ private final Optional<String> forRecipient;
private final boolean consume;
private final long initialCount;
private final long remainingCount;
@@ -76,6 +81,7 @@
@JsonProperty("repositoryPath") String repositoryPath,
@JsonProperty("targetQueue") String targetQueue,
@JsonProperty("targetProcessor") Optional<String> targetProcessor,
+ @JsonProperty("forRecipient") Optional<String> forRecipient,
@JsonProperty("consume") Optional<Boolean> consume,
@JsonProperty("initialCount") long initialCount,
@JsonProperty("remainingCount") long remainingCount,
@@ -86,6 +92,7 @@
this.repositoryPath = repositoryPath;
this.targetQueue = targetQueue;
this.targetProcessor = targetProcessor;
+ this.forRecipient = forRecipient;
this.initialCount = initialCount;
this.remainingCount = remainingCount;
this.timestamp = timestamp;
@@ -131,6 +138,10 @@
return targetProcessor;
}
+ public Optional<String> getForRecipient() {
+ return forRecipient;
+ }
+
public Optional<Integer> getLimit() {
return limit;
}
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java
index 7784e5c..bdc2066 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java
@@ -20,6 +20,7 @@
import java.util.Optional;
+import org.apache.james.core.MailAddress;
import org.apache.james.json.DTOModule;
import org.apache.james.mailrepository.api.MailRepositoryPath;
import org.apache.james.queue.api.MailQueueName;
@@ -28,6 +29,7 @@
import org.apache.james.util.streams.Limit;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
public class ReprocessingAllMailsTaskDTO implements TaskDTO {
@@ -50,6 +52,7 @@
domainObject.getConfiguration().getMailQueueName().asString(),
Optional.of(domainObject.getConfiguration().isConsume()),
domainObject.getConfiguration().getTargetProcessor(),
+ domainObject.getConfiguration().getForRecipient().map(MailAddress::asString),
domainObject.getConfiguration().getLimit().getLimit(),
domainObject.getConfiguration().getMaxRetries());
} catch (Exception e) {
@@ -63,6 +66,7 @@
private final String targetQueue;
private final boolean consume;
private final Optional<String> targetProcessor;
+ private final Optional<String> forRecipient;
private final Optional<Integer> limit;
private final Optional<Integer> maxRetries;
@@ -72,6 +76,7 @@
@JsonProperty("targetQueue") String targetQueue,
@JsonProperty("consume") Optional<Boolean> consume,
@JsonProperty("targetProcessor") Optional<String> targetProcessor,
+ @JsonProperty("forRecipient") Optional<String> forRecipient,
@JsonProperty("limit") Optional<Integer> limit,
@JsonProperty("maxRetries") Optional<Integer> maxRetries) {
this.type = type;
@@ -80,6 +85,7 @@
this.targetQueue = targetQueue;
this.consume = consume.orElse(true);
this.targetProcessor = targetProcessor;
+ this.forRecipient = forRecipient;
this.limit = limit;
this.maxRetries = maxRetries;
}
@@ -94,6 +100,7 @@
MailQueueName.of(targetQueue),
targetProcessor,
maxRetries,
+ forRecipient.map(Throwing.function(MailAddress::new)),
consume,
Limit.from(limit)));
} catch (Exception e) {
@@ -126,6 +133,10 @@
return consume;
}
+ public Optional<String> getForRecipient() {
+ return forRecipient;
+ }
+
public Optional<String> getTargetProcessor() {
return targetProcessor;
}
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java
index 74d3809..a6c1520 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java
@@ -21,6 +21,7 @@
import java.time.Instant;
import java.util.Optional;
+import org.apache.james.core.MailAddress;
import org.apache.james.json.DTOModule;
import org.apache.james.mailrepository.api.MailKey;
import org.apache.james.mailrepository.api.MailRepositoryPath;
@@ -30,6 +31,7 @@
import org.apache.james.util.streams.Limit;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
public class ReprocessingOneMailTaskAdditionalInformationDTO implements AdditionalInformationDTO {
@@ -44,6 +46,7 @@
MailQueueName.of(dto.targetQueue),
dto.targetProcessor,
NO_MAX_RETRIES,
+ dto.forRecipient.map(Throwing.function(MailAddress::new)),
dto.isConsume(),
Limit.unlimited()),
new MailKey(dto.mailKey),
@@ -55,6 +58,7 @@
details.getMailKey(),
Optional.of(details.getConfiguration().isConsume()),
details.getConfiguration().getTargetProcessor(),
+ details.getConfiguration().getForRecipient().map(MailAddress::asString),
details.timestamp()))
.typeName(ReprocessingOneMailTask.TYPE.asString())
.withFactory(AdditionalInformationDTOModule::new);
@@ -65,6 +69,7 @@
private final String targetQueue;
private final String mailKey;
private final Optional<String> targetProcessor;
+ private final Optional<String> forRecipient;
private final boolean consume;
private final Instant timestamp;
@@ -74,6 +79,7 @@
@JsonProperty("mailKey") String mailKey,
@JsonProperty("consume") Optional<Boolean> consume,
@JsonProperty("targetProcessor") Optional<String> targetProcessor,
+ @JsonProperty("forRecipient") Optional<String> forRecipient,
@JsonProperty("timestamp") Instant timestamp) {
this.type = type;
this.consume = consume.orElse(true);
@@ -81,6 +87,7 @@
this.targetQueue = targetQueue;
this.mailKey = mailKey;
this.targetProcessor = targetProcessor;
+ this.forRecipient = forRecipient;
this.timestamp = timestamp;
}
@@ -109,6 +116,10 @@
return timestamp;
}
+ public Optional<String> getForRecipient() {
+ return forRecipient;
+ }
+
public Optional<String> getTargetProcessor() {
return targetProcessor;
}
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java
index af0016f..eac8a86 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java
@@ -21,6 +21,7 @@
import java.time.Clock;
import java.util.Optional;
+import org.apache.james.core.MailAddress;
import org.apache.james.json.DTOModule;
import org.apache.james.mailrepository.api.MailKey;
import org.apache.james.mailrepository.api.MailRepositoryPath;
@@ -30,6 +31,7 @@
import org.apache.james.util.streams.Limit;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
public class ReprocessingOneMailTaskDTO implements TaskDTO {
@@ -53,6 +55,7 @@
domainObject.getConfiguration().getMailQueueName().asString(),
domainObject.getMailKey().asString(),
domainObject.getConfiguration().getTargetProcessor(),
+ domainObject.getConfiguration().getForRecipient().map(MailAddress::asString),
Optional.of(domainObject.getConfiguration().isConsume()));
} catch (Exception e) {
throw new ReprocessingOneMailTask.UrlEncodingFailureSerializationException(domainObject.getRepositoryPath());
@@ -64,6 +67,7 @@
private final String targetQueue;
private final String mailKey;
private final Optional<String> targetProcessor;
+ private final Optional<String> forRecipient;
private final boolean consume;
public ReprocessingOneMailTaskDTO(@JsonProperty("type") String type,
@@ -71,12 +75,14 @@
@JsonProperty("targetQueue") String targetQueue,
@JsonProperty("mailKey") String mailKey,
@JsonProperty("targetProcessor") Optional<String> targetProcessor,
+ @JsonProperty("forRecipient") Optional<String> forRecipient,
@JsonProperty("boolean") Optional<Boolean> consume) {
this.type = type;
this.repositoryPath = repositoryPath;
this.mailKey = mailKey;
this.targetQueue = targetQueue;
this.targetProcessor = targetProcessor;
+ this.forRecipient = forRecipient;
this.consume = consume.orElse(true);
}
@@ -88,6 +94,7 @@
MailQueueName.of(targetQueue),
targetProcessor,
NO_MAX_RETRIES,
+ forRecipient.map(Throwing.function(MailAddress::new)),
consume,
Limit.unlimited()),
new MailKey(mailKey),
@@ -123,6 +130,10 @@
return consume;
}
+ public Optional<String> getForRecipient() {
+ return forRecipient;
+ }
+
public Optional<String> getTargetProcessor() {
return targetProcessor;
}
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
index c1f0441..730c0f8 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
@@ -31,6 +31,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
+import org.apache.james.core.MailAddress;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.mailrepository.api.MailKey;
import org.apache.james.mailrepository.api.MailRepository;
@@ -69,17 +70,27 @@
private final MailQueueName mailQueueName;
private final Optional<String> targetProcessor;
private final Optional<Integer> maxRetries;
+ private final Optional<MailAddress> forRecipient;
private final boolean consume;
private final Limit limit;
- public Configuration(MailQueueName mailQueueName, Optional<String> targetProcessor, Optional<Integer> maxRetries, boolean consume, Limit limit) {
+ public Configuration(MailQueueName mailQueueName, Optional<String> targetProcessor, Optional<Integer> maxRetries, Optional<MailAddress> forRecipient, boolean consume, Limit limit) {
this.mailQueueName = mailQueueName;
this.targetProcessor = targetProcessor;
this.maxRetries = maxRetries;
+ this.forRecipient = forRecipient;
this.consume = consume;
this.limit = limit;
}
+ public Configuration(MailQueueName mailQueueName, Optional<String> targetProcessor, Optional<Integer> maxRetries, boolean consume, Limit limit) {
+ this(mailQueueName, targetProcessor, maxRetries, Optional.empty(), consume, limit);
+ }
+
+ public Optional<MailAddress> getForRecipient() {
+ return forRecipient;
+ }
+
public MailQueueName getMailQueueName() {
return mailQueueName;
}
@@ -178,11 +189,16 @@
.doOnNext(keyListener)
.flatMap(mailKey -> Mono.fromCallable(() -> repository.retrieve(mailKey))
.map(mail -> Triple.of(mail, repository, mailKey)))
- .filter(triple -> !reprocessor.retryExceeded(triple.getLeft())))))
+ .filter(triple -> !reprocessor.retryExceeded(triple.getLeft()))))
+ .filter(triple -> filterRecipients(configuration, triple)))
.flatMap(triple -> reprocess(triple.getRight(), triple.getLeft(), triple.getMiddle(), reprocessor))
.reduce(Task.Result.COMPLETED, Task::combine);
}
+ private static Boolean filterRecipients(Configuration configuration, Triple<Mail, MailRepository, MailKey> triple) {
+ return configuration.forRecipient.map(rcpt -> triple.getLeft().getRecipients().contains(rcpt)).orElse(true);
+ }
+
private Mono<Task.Result> reprocess(MailKey key, Mail mail, MailRepository repository, Reprocessor reprocessor) {
return Mono.fromRunnable(() -> reprocessor.reprocess(repository, mail, key))
.thenReturn(Task.Result.COMPLETED)
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
index 94e2cff..85a46e4 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
@@ -1141,6 +1141,39 @@
}
@Test
+ void reprocessingAllTaskShouldAllowFilteringByRecipient() throws Exception {
+ MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO);
+ String recipient1 = "recipient1@domain";
+ String recipient2 = "recipient2@domain";
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .recipient(recipient1)
+ .mimeMessage(MimeMessageUtil.mimeMessageFromBytes(MESSAGE_BYTES))
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .recipient(recipient2)
+ .mimeMessage(MimeMessageUtil.mimeMessageFromBytes(MESSAGE_BYTES))
+ .build());
+
+ String taskId = with()
+ .param("action", "reprocess")
+ .param("forRecipient", recipient1)
+ .patch(PATH_ESCAPED_MY_REPO + "/mails")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await");
+
+ assertThat(mailRepository.list())
+ .toIterable()
+ .containsOnly(new MailKey(NAME_2));
+ }
+
+ @Test
void reprocessingAllTaskShouldIncludeDetails() throws Exception {
MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO);
String name1 = "name1";