blob: eea2726dd0c45d7d4256aafc67272903bfe6c2c3 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.transport.mailets;
import static org.apache.james.mailets.configuration.CommonProcessors.ERROR_REPOSITORY;
import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN;
import static org.apache.james.mailets.configuration.Constants.LOCALHOST_IP;
import static org.apache.james.mailets.configuration.Constants.PASSWORD;
import static org.apache.james.mailets.configuration.Constants.awaitAtMostOneMinute;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.stream.IntStream;
import jakarta.mail.internet.MimeMessage;
import org.apache.james.core.Username;
import org.apache.james.core.builder.MimeMessageBuilder;
import org.apache.james.jmap.MessageIdProbe;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MessageResult;
import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
import org.apache.james.mailbox.model.SearchQuery;
import org.apache.james.mailets.TemporaryJamesServer;
import org.apache.james.mailets.configuration.CommonProcessors;
import org.apache.james.mailets.configuration.MailetConfiguration;
import org.apache.james.mailets.configuration.MailetContainer;
import org.apache.james.mailets.configuration.ProcessorConfiguration;
import org.apache.james.modules.MailboxProbeImpl;
import org.apache.james.modules.protocols.ImapGuiceProbe;
import org.apache.james.modules.protocols.SmtpGuiceProbe;
import org.apache.james.rate.limiter.memory.MemoryRateLimiterModule;
import org.apache.james.transport.matchers.All;
import org.apache.james.utils.DataProbeImpl;
import org.apache.james.utils.SMTPMessageSender;
import org.apache.james.utils.TestIMAPClient;
import org.apache.mailet.base.test.FakeMail;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import com.github.fge.lambdas.Throwing;
class RequeueThrottlingIntegrationTest {
private static final String SENDER = "sender1@" + DEFAULT_DOMAIN;
private static final String RECIPIENT1 = "recipient1@" + DEFAULT_DOMAIN;
private TemporaryJamesServer jamesServer;
private MimeMessage message;
@RegisterExtension
public TestIMAPClient testIMAPClient = new TestIMAPClient();
@RegisterExtension
public SMTPMessageSender messageSender = new SMTPMessageSender(DEFAULT_DOMAIN);
@BeforeEach
void setup(@TempDir File temporaryFolder) throws Exception {
MailetContainer.Builder mailetContainer = TemporaryJamesServer.simpleMailetContainerConfiguration()
.putProcessor(ProcessorConfiguration.error()
.enableJmx(false)
.addMailet(MailetConfiguration.builder()
.matcher(All.class)
.mailet(ToRepository.class)
.addProperty("repositoryPath", ERROR_REPOSITORY.asString()))
.build())
.putProcessor(ProcessorConfiguration.transport()
.addMailet(MailetConfiguration.builder()
.matcher(All.class)
.mailet(PerSenderRateLimit.class)
.addProperty("duration", "3s")
.addProperty("precision", "1s")
.addProperty("count", "1")
.addProperty("exceededProcessor", "tooMuchEmails")
.build())
.addMailetsFrom(CommonProcessors.transport()))
.putProcessor(ProcessorConfiguration.builder()
.state("tooMuchEmails")
.addMailet(MailetConfiguration.builder()
.matcher(All.class)
.mailet(Requeue.class)
.addProperty("delay", "4s")
.build())
.build());
jamesServer = TemporaryJamesServer.builder()
.withMailetContainer(mailetContainer)
.withOverrides(new MemoryRateLimiterModule())
.build(temporaryFolder);
jamesServer.start();
jamesServer.getProbe(DataProbeImpl.class).fluent()
.addDomain(DEFAULT_DOMAIN)
.addUser(SENDER, PASSWORD)
.addUser(RECIPIENT1, PASSWORD);
message = MimeMessageBuilder.mimeMessageBuilder()
.setSubject("test")
.setText("text1")
.build();
}
@AfterEach
void tearDown() {
jamesServer.shutdown();
}
@Test
void throttlingShouldWork() throws Exception {
messageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
.authenticate(SENDER, PASSWORD)
.sendMessage(FakeMail.builder()
.name("name")
.mimeMessage(message)
.sender(SENDER)
.recipient(RECIPIENT1));
// await first message
testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort())
.login(RECIPIENT1, PASSWORD)
.select(TestIMAPClient.INBOX)
.awaitMessage(awaitAtMostOneMinute);
// the second message should exceed count limit and re-enqueued
messageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
.authenticate(SENDER, PASSWORD)
.sendMessage(FakeMail.builder()
.name("name")
.mimeMessage(message)
.sender(SENDER)
.recipient(RECIPIENT1));
// Then the second message should be sent to recipient after delay
MailboxProbeImpl mailboxProbe = jamesServer.getProbe(MailboxProbeImpl.class);
awaitAtMostOneMinute.until(() -> mailboxProbe.searchMessage(MultimailboxesSearchQuery.from(SearchQuery.of(SearchQuery.all())).build(), RECIPIENT1, Integer.MAX_VALUE).size() == 2);
// Checking the received dates of the emails to make sure emails were throttled
ArrayList<MessageId> messageIds = new ArrayList<>(mailboxProbe.searchMessage(MultimailboxesSearchQuery.from(SearchQuery.of(SearchQuery.all())).build(), RECIPIENT1, Integer.MAX_VALUE));
Username recipient = Username.of(RECIPIENT1);
MessageIdProbe messageIdProbe = jamesServer.getProbe(MessageIdProbe.class);
MessageResult firstMessage = messageIdProbe.getMessages(messageIds.get(0), recipient).get(0);
MessageResult secondMessage = messageIdProbe.getMessages(messageIds.get(1), recipient).get(0);
assertThat(Duration.between(
firstMessage.getInternalDate().toInstant(),
secondMessage.getInternalDate().toInstant()).abs())
.isGreaterThan(Duration.ofSeconds(3));
}
@Test
void throttlingShouldWorkWhenSeveralMessagesIsRateLimitExceeded() {
IntStream.range(1, 4)
.forEach(i -> Throwing.runnable(() -> messageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
.authenticate(SENDER, PASSWORD)
.sendMessage(FakeMail.builder()
.name("name")
.mimeMessage(message)
.sender(SENDER)
.recipient(RECIPIENT1))).sneakyThrow().run());
awaitAtMostOneMinute.until(() -> testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort())
.login(RECIPIENT1, PASSWORD)
.select(TestIMAPClient.INBOX)
.awaitMessage(awaitAtMostOneMinute)
.getMessageCount(TestIMAPClient.INBOX) == 3);
}
}