blob: 55fa07907d90117af91c6bdb7f3501e9c393f805 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.queue.rabbitmq;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static java.time.temporal.ChronoUnit.DAYS;
import static java.time.temporal.ChronoUnit.HOURS;
import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty;
import static org.apache.james.backends.cassandra.StatementRecorder.Selector.preparedStatementStartingWith;
import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.queue.api.Mails.defaultMail;
import static org.apache.james.queue.api.Mails.defaultMailNoRecipient;
import static org.apache.mailet.base.MailAddressFixture.RECIPIENT1;
import static org.apache.mailet.base.MailAddressFixture.SENDER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.TEN_SECONDS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.StatementRecorder;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.cassandra.BlobTables;
import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobStoreFactory;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.core.builder.MimeMessageBuilder;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.metrics.api.Gauge;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.MailQueueMetricContract;
import org.apache.james.queue.api.MailQueueMetricExtension;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.ManageableMailQueueContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory;
import org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDAO;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices;
import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
import org.apache.james.util.streams.Iterators;
import org.apache.james.utils.UpdatableTickingClock;
import org.apache.mailet.Mail;
import org.apache.mailet.base.test.FakeMail;
import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import com.github.fge.lambdas.Throwing;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;
class RabbitMQMailQueueTest {
private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
private static final int THREE_BUCKET_COUNT = 3;
private static final int UPDATE_BROWSE_START_PACE = 25;
private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1);
private static final org.apache.james.queue.api.MailQueueName SPOOL = org.apache.james.queue.api.MailQueueName.of("spool");
private static final Instant IN_SLICE_1 = Instant.now().minus(60, DAYS);
private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1, HOURS);
private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS);
private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4, HOURS);
private static final Instant IN_SLICE_7 = IN_SLICE_1.plus(6, HOURS);
public static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();
@RegisterExtension
static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(
CassandraBlobModule.MODULE,
CassandraMailQueueViewModule.MODULE,
CassandraEventStoreModule.MODULE(),
CassandraSchemaVersionModule.MODULE));
@RegisterExtension
static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ()
.isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);
private RabbitMQMailQueueFactory mailQueueFactory;
private UpdatableTickingClock clock;
private RabbitMQMailQueue mailQueue;
private RabbitMQMailQueueManagement mqManagementApi;
@AfterEach
void tearDown() {
mqManagementApi.deleteAllQueues();
}
@Nested
class MailQueueSizeMetricsEnabled implements ManageableMailQueueContract, MailQueueMetricContract {
@BeforeEach
void setup(CassandraCluster cassandra,
MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
setUp(cassandra,
metricTestSystem,
RabbitMQMailQueueConfiguration.builder()
.sizeMetricsEnabled(true)
.build(),
CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
.passthrough(),
MailQueueFactory.prefetchCount(3));
}
@Override
public void enQueue(Mail mail) throws MailQueue.MailQueueException {
ManageableMailQueueContract.super.enQueue(mail);
clock.tick();
}
@Override
public RabbitMQMailQueue getMailQueue() {
return mailQueue;
}
@Override
public ManageableMailQueue getManageableMailQueue() {
return mailQueue;
}
@Test
void browseShouldReturnCurrentlyEnqueuedMailFromAllSlices() throws Exception {
ManageableMailQueue mailQueue = getManageableMailQueue();
int emailCount = 5;
clock.setInstant(IN_SLICE_1);
enqueueSomeMails(namePatternForSlice(1), emailCount);
clock.setInstant(IN_SLICE_2);
enqueueSomeMails(namePatternForSlice(2), emailCount);
clock.setInstant(IN_SLICE_3);
enqueueSomeMails(namePatternForSlice(3), emailCount);
clock.setInstant(IN_SLICE_5);
enqueueSomeMails(namePatternForSlice(5), emailCount);
clock.setInstant(IN_SLICE_7);
Stream<String> names = Iterators.toStream(mailQueue.browse())
.map(ManageableMailQueue.MailQueueItemView::getMail)
.map(Mail::getName);
assertThat(names).containsExactly(
"1-1", "1-2", "1-3", "1-4", "1-5",
"2-1", "2-2", "2-3", "2-4", "2-5",
"3-1", "3-2", "3-3", "3-4", "3-5",
"5-1", "5-2", "5-3", "5-4", "5-5");
}
@Test
void browseStartShouldBeUpdated(CassandraCluster cassandraCluster, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
setUp(cassandraCluster, metricTestSystem,
RabbitMQMailQueueConfiguration.builder()
.sizeMetricsEnabled(true)
.build(),
CassandraBlobStoreFactory.forTesting(cassandraCluster.getConf(), new RecordingMetricFactory())
.passthrough(),
MailQueueFactory.prefetchCount(1));
int emailCount = 250;
StatementRecorder.Selector selector = preparedStatementStartingWith("UPDATE browsestart");
StatementRecorder statementRecorder = cassandraCluster.getConf()
.recordStatements(selector);
clock.setInstant(IN_SLICE_1);
enqueueSomeMails(namePatternForSlice(1), emailCount);
dequeueMails(emailCount);
clock.setInstant(IN_SLICE_2);
enqueueSomeMails(namePatternForSlice(2), emailCount);
dequeueMails(emailCount);
clock.setInstant(IN_SLICE_3);
enqueueSomeMails(namePatternForSlice(3), emailCount);
dequeueMails(emailCount);
// The actual rate of update should actually be lower than the update probability.
assertThat(statementRecorder.listExecutedStatements(selector))
.hasSizeBetween(2, 12);
}
@Test
void contentStartShouldBeUpdated(CassandraCluster cassandraCluster, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
setUp(cassandraCluster, metricTestSystem,
RabbitMQMailQueueConfiguration.builder()
.sizeMetricsEnabled(true)
.build(),
CassandraBlobStoreFactory.forTesting(cassandraCluster.getConf(), new RecordingMetricFactory())
.passthrough(),
MailQueueFactory.prefetchCount(1));
int emailCount = 250;
StatementRecorder.Selector selector = preparedStatementStartingWith("UPDATE contentstart");
StatementRecorder statementRecorder = cassandraCluster.getConf().recordStatements(selector);
clock.setInstant(IN_SLICE_1);
enqueueSomeMails(namePatternForSlice(1), emailCount);
dequeueMails(emailCount);
clock.setInstant(IN_SLICE_2);
enqueueSomeMails(namePatternForSlice(2), emailCount);
dequeueMails(emailCount);
clock.setInstant(IN_SLICE_3);
enqueueSomeMails(namePatternForSlice(3), emailCount);
dequeueMails(emailCount);
// The actual rate of update should actually be lower than the update probability.
assertThat(statementRecorder.listExecutedStatements(selector))
.hasSizeBetween(2, 12);
}
@Test
void dequeueShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
String name1 = "myMail1";
Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
dequeueFlux.take(1)
.flatMap(mailQueueItem -> Mono.fromCallable(() -> {
mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
return mailQueueItem;
}).subscribeOn(Schedulers.fromExecutor(EXECUTOR))).blockLast(Duration.ofSeconds(10));
assertThat(cassandra.getConf().execute(selectFrom(BlobTables.DefaultBucketBlobTable.TABLE_NAME)
.all().build()))
.isEmpty();
}
@Test
void clearShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
String name1 = "myMail1";
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
getManageableMailQueue().clear();
assertThat(cassandra.getConf().execute(selectFrom(BlobTables.DefaultBucketBlobTable.TABLE_NAME)
.all().build()))
.isEmpty();
}
@Test
void removeByNameShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
String name1 = "myMail1";
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
getManageableMailQueue().remove(ManageableMailQueue.Type.Name, name1);
assertThat(cassandra.getConf().execute(selectFrom(BlobTables.DefaultBucketBlobTable.TABLE_NAME)
.all().build()))
.isEmpty();
}
@Test
void removeByRecipientShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
String name1 = "myMail1";
getMailQueue().enQueue(defaultMailNoRecipient()
.name(name1)
.recipient(RECIPIENT1)
.build());
getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, RECIPIENT1.asString());
assertThat(cassandra.getConf().execute(selectFrom(BlobTables.DefaultBucketBlobTable.TABLE_NAME)
.all().build()))
.isEmpty();
}
@Test
void removeBySenderShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
String name1 = "myMail1";
getMailQueue().enQueue(defaultMail()
.name(name1)
.sender(SENDER)
.build());
getManageableMailQueue().remove(ManageableMailQueue.Type.Sender, SENDER.asString());
assertThat(cassandra.getConf().execute(selectFrom(BlobTables.DefaultBucketBlobTable.TABLE_NAME)
.all().build()))
.isEmpty();
}
@Test
void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception {
ManageableMailQueue mailQueue = getManageableMailQueue();
int emailCount = 5;
clock.setInstant(IN_SLICE_1);
enqueueSomeMails(namePatternForSlice(1), emailCount);
clock.setInstant(IN_SLICE_2);
enqueueSomeMails(namePatternForSlice(2), emailCount);
clock.setInstant(IN_SLICE_3);
enqueueSomeMails(namePatternForSlice(3), emailCount);
clock.setInstant(IN_SLICE_5);
enqueueSomeMails(namePatternForSlice(5), emailCount);
clock.setInstant(IN_SLICE_7);
dequeueMails(5);
dequeueMails(5);
dequeueMails(3);
Stream<String> names = Iterators.toStream(mailQueue.browse())
.map(ManageableMailQueue.MailQueueItemView::getMail)
.map(Mail::getName);
assertThat(names)
.containsExactly("3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5");
}
@Test
void enqueuedEmailsShouldEventuallyBeCleaned() {
ManageableMailQueue mailQueue = getManageableMailQueue();
int emailCount = 100;
clock.setInstant(IN_SLICE_1);
enqueueSomeMails(namePatternForSlice(1), emailCount);
clock.setInstant(IN_SLICE_2);
enqueueSomeMails(namePatternForSlice(2), emailCount);
clock.setInstant(IN_SLICE_3);
enqueueSomeMails(namePatternForSlice(3), emailCount);
clock.setInstant(IN_SLICE_5);
enqueueSomeMails(namePatternForSlice(5), emailCount);
clock.setInstant(IN_SLICE_7);
dequeueMails(emailCount);
dequeueMails(emailCount);
dequeueMails(emailCount);
dequeueMails(emailCount);
// ensure slice 1 was cleaned
EnqueuedMailsDAO mailsDAO = new EnqueuedMailsDAO(cassandraCluster.getCassandraCluster().getConf(), new HashBlobId.Factory());
MailQueueName queueName = MailQueueName.fromString(mailQueue.getName().asString());
Slice slice = Slice.of(currentSliceStartInstant(IN_SLICE_1));
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(mailsDAO.selectEnqueuedMails(queueName, slice, BucketedSlices.BucketId.of(0))
.collectList()
.block())
.isEmpty();
softly.assertThat(mailsDAO.selectEnqueuedMails(queueName, slice, BucketedSlices.BucketId.of(1))
.collectList()
.block())
.isEmpty();
softly.assertThat(mailsDAO.selectEnqueuedMails(queueName, slice, BucketedSlices.BucketId.of(2))
.collectList()
.block())
.isEmpty();
});
}
private Instant currentSliceStartInstant(Instant instant) {
long sliceSize = ONE_HOUR_SLICE_WINDOW.getSeconds();
long sliceId = instant.getEpochSecond() / sliceSize;
return Instant.ofEpochSecond(sliceId * sliceSize);
}
private Function<Integer, String> namePatternForSlice(int sliceId) {
return i -> sliceId + "-" + i;
}
@Test
void mailQueueShouldBeInitializedWhenCreating(CassandraCluster cassandra) {
org.apache.james.queue.api.MailQueueName name = org.apache.james.queue.api.MailQueueName.of("myQueue");
mailQueueFactory.createQueue(name);
boolean initialized = CassandraMailQueueViewTestFactory.isInitialized(cassandra.getConf(), MailQueueName.fromString(name.asString()));
assertThat(initialized).isTrue();
}
@Test
void enQueueShouldNotThrowOnMailNameWithNegativeHash() {
String negativehashedString = "this sting will have a negative hash"; //hash value: -1256871313
assertThatCode(() -> getMailQueue().enQueue(defaultMail().name(negativehashedString).build()))
.doesNotThrowAnyException();
}
@Disabled("JAMES-2614 RabbitMQMailQueueTest::concurrentEnqueueDequeueShouldNotFail is unstable." +
"The related test is disabled, and need to be re-enabled after investigation and a fix.")
@Test
@Override
public void concurrentEnqueueDequeueShouldNotFail() {
}
@Test
void dequeueShouldWorkAfterNetworkOutage() throws Exception {
String name1 = "myMail1";
String name2 = "myMail2";
String name3 = "myMail3";
List<MailQueue.MailQueueItem> receivedItem = new ArrayList<>();
Flux.from(getMailQueue().deQueue())
.doOnNext(receivedItem::add)
.subscribe();
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
rabbitMQExtension.getRabbitMQ().pause();
Thread.sleep(2000);
try {
getMailQueue().enQueue(defaultMail()
.name(name2)
.build());
} catch (Exception e) {
// Expected
} finally {
rabbitMQExtension.getRabbitMQ().unpause();
Thread.sleep(100);
}
getMailQueue().enQueue(defaultMail()
.name(name3)
.build());
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> assertThat(receivedItem)
.extracting(item -> item.getMail().getName())
.contains(name1, name3));
}
@Test
void enqueuedEmailsShouldNotBeLostDuringRabbitMQOutages() throws Exception {
String name = "myMail";
try {
getMailQueue().enQueue(defaultMail()
.name(name)
.build());
} catch (Exception e) {
// Ignore
}
rabbitMQExtension.managementAPI().purgeQueue(rabbitMQExtension.getRabbitMQ().getConfiguration().getVhost().orElse("/"),
"JamesMailQueue-workqueue-spool");
getMailQueue().republishNotProcessedMails(clock.instant().plus(30, ChronoUnit.MINUTES)).blockLast();
Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
List<MailQueue.MailQueueItem> items = dequeueFlux.take(1)
.collectList().block(Duration.ofSeconds(10));
assertThat(items)
.extracting(item -> item.getMail().getName())
.containsOnly(name);
}
@Test
void messagesShouldSurviveRabbitMQRestart() throws Exception {
String name1 = "myMail1";
String name2 = "myMail2";
String name3 = "myMail3";
Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
getMailQueue().enQueue(defaultMail()
.name(name2)
.build());
getMailQueue().enQueue(defaultMail()
.name(name3)
.build());
rabbitMQExtension.getRabbitMQ().restart();
List<MailQueue.MailQueueItem> items = dequeueFlux.take(3).collectList().block(Duration.ofSeconds(10));
assertThat(items)
.extracting(item -> item.getMail().getName())
.containsExactly(name1, name2, name3);
}
@Test
void messagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessed() throws Exception {
clock.setInstant(Instant.now().minus(Duration.ofHours(2)));
String name1 = "myMail1";
String name2 = "myMail2";
String name3 = "myMail3";
Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
// Avoid early processing and prefetching
Sender sender = rabbitMQExtension.getSender();
suspendDequeuing(sender);
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
getMailQueue().enQueue(defaultMail()
.name(name2)
.build());
getMailQueue().enQueue(defaultMail()
.name(name3)
.build());
resumeDequeuing(sender);
assertThat(getMailQueue()
.republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
.collectList()
.block())
.containsExactlyInAnyOrder(name1, name2, name3);
List<MailQueue.MailQueueItem> items = dequeueFlux.take(Duration.ofSeconds(10)).collectList().block();
assertThat(items)
.extracting(item -> item.getMail().getName())
.containsExactlyInAnyOrder(name1, name2, name3);
}
@Test
void onlyOldMessagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessed() throws Exception {
clock.setInstant(Instant.now().minus(Duration.ofHours(2)));
String name1 = "myMail1";
String name2 = "myMail2";
String name3 = "myMail3";
Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
// Avoid early processing and prefetching
Sender sender = rabbitMQExtension.getSender();
suspendDequeuing(sender);
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
getMailQueue().enQueue(defaultMail()
.name(name2)
.build());
clock.setInstant(Instant.now());
getMailQueue().enQueue(defaultMail()
.name(name3)
.build());
resumeDequeuing(sender);
assertThat(getMailQueue()
.republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
.collectList()
.block())
.containsExactlyInAnyOrder(name1, name2);
List<MailQueue.MailQueueItem> items = dequeueFlux.take(Duration.ofSeconds(10)).collectList().block();
assertThat(items)
.extracting(item -> item.getMail().getName())
.containsExactlyInAnyOrder(name1, name2);
}
@Test
void messagesShouldBeProcessedAfterTwoMailsReprocessing() throws Exception {
clock.setInstant(Instant.now().minus(Duration.ofHours(2)));
String name1 = "myMail1";
String name2 = "myMail2";
String name3 = "myMail3";
Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
// Avoid early processing and prefetching
Sender sender = rabbitMQExtension.getSender();
suspendDequeuing(sender);
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
getMailQueue().enQueue(defaultMail()
.name(name2)
.build());
getMailQueue().enQueue(defaultMail()
.name(name3)
.build());
assertThat(getMailQueue()
.republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
.collectList()
.block())
.containsExactlyInAnyOrder(name1, name2, name3);
resumeDequeuing(sender);
assertThat(getMailQueue()
.republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
.collectList()
.block())
.containsExactlyInAnyOrder(name1, name2, name3);
List<MailQueue.MailQueueItem> items = dequeueFlux.take(Duration.ofSeconds(10)).collectList().block();
assertThat(items)
.extracting(item -> item.getMail().getName())
.containsExactlyInAnyOrder(name1, name2, name3);
}
@Test
void messagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessedAndNewMessagesShouldNotBeLost() throws Exception {
clock.setInstant(Instant.now().minus(Duration.ofHours(2)));
String name1 = "myMail1";
String name2 = "myMail2";
String name3 = "myMail3";
Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
// Avoid early processing and prefetching
Sender sender = rabbitMQExtension.getSender();
suspendDequeuing(sender);
//mail send when rabbit down
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
resumeDequeuing(sender);
//mail send when rabbit is up again and before rebuild
clock.setInstant(Instant.now());
getMailQueue().enQueue(defaultMail()
.name(name3)
.build());
Flux.merge(
Mono.fromCallable(() -> {
//mail send concurently with rebuild
getMailQueue().enQueue(defaultMail()
.name(name2)
.build());
return true;
}).subscribeOn(Schedulers.fromExecutor(EXECUTOR)),
Mono.fromRunnable(() ->
assertThat(getMailQueue()
.republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
.collectList()
.block())
.containsOnly(name1)).subscribeOn(Schedulers.fromExecutor(EXECUTOR)))
.then()
.block(Duration.ofSeconds(10));
List<MailQueue.MailQueueItem> items = dequeueFlux.take(Duration.ofSeconds(10)).collectList().block();
assertThat(items)
.extracting(item -> item.getMail().getName())
.containsExactlyInAnyOrder(name1, name2, name3);
}
private void enqueueSomeMails(Function<Integer, String> namePattern, int emailCount) {
IntStream.rangeClosed(1, emailCount)
.forEach(Throwing.intConsumer(i -> {
FakeMail mail = defaultMail()
.name(namePattern.apply(i))
.build();
enQueue(mail);
LifecycleUtil.dispose(mail);
}));
}
private void dequeueMails(int times) {
AtomicInteger counter = new AtomicInteger(0);
Disposable disposable = Flux.from(getManageableMailQueue()
.deQueue())
.concatMap(mailQueueItem -> Mono.fromCallable(() -> {
if (counter.getAndIncrement() < times) {
mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
return mailQueueItem;
} else {
mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.RETRY);
return null;
}
}).subscribeOn(Schedulers.fromExecutor(EXECUTOR)))
.subscribe();
try {
await()
.atMost(Duration.ofMinutes(10))
.untilAsserted(() -> assertThat(counter.get()).isGreaterThanOrEqualTo(times));
} finally {
disposable.dispose();
}
}
@Test
void dequeueShouldRetryLoadingErrors(CassandraCluster cassandra) throws Exception {
String name1 = "myMail1";
String name2 = "myMail2";
String name3 = "myMail3";
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
getMailQueue().enQueue(defaultMail()
.name(name2)
.build());
getMailQueue().enQueue(defaultMail()
.name(name3)
.build());
cassandra.getConf().registerScenario(fail()
.times(1)
.whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
List<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue())
.take(3)
.collectList()
.block(Duration.ofSeconds(10));
assertThat(items)
.extracting(item -> item.getMail().getName())
.containsOnly(name1, name2, name3);
}
@Test
void dequeueShouldNotRetryWhenBlobIsMissing(CassandraCluster cassandra) throws Exception {
String name1 = "myMail1";
String name2 = "myMail2";
String name3 = "myMail3";
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
getMailQueue().enQueue(defaultMail()
.name(name2)
.build());
getMailQueue().enQueue(defaultMail()
.name(name3)
.build());
cassandra.getConf().registerScenario(returnEmpty()
.forever()
.whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id"));
ConcurrentLinkedDeque<String> dequeuedNames = new ConcurrentLinkedDeque<>();
Flux.from(getMailQueue().deQueue())
.take(3)
.doOnNext(item -> dequeuedNames.add(item.getMail().getName()))
.flatMap(item -> Mono.fromRunnable(Throwing.runnable(() -> item.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS)))
.subscribeOn(Schedulers.fromExecutor(EXECUTOR)))
.subscribeOn(Schedulers.fromExecutor(EXECUTOR))
.subscribe();
// One second should be enough to attempt dequeues while we fail to load blobs
Thread.sleep(1000);
// Restore normal behaviour
cassandra.getConf().registerScenario(executeNormally()
.forever()
.whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id"));
// Let one second to check if the queue is empty
Thread.sleep(1000);
// We expect content missing blob references to be purged from the queue
assertThat(dequeuedNames).isEmpty();
}
@Test
void dequeueShouldNotAbortProcessingUponSerializationIssuesErrors() throws Exception {
String name1 = "myMail1";
String name2 = "myMail2";
String name3 = "myMail3";
String emptyRoutingKey = "";
rabbitMQExtension.getSender()
.send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool",
emptyRoutingKey,
"BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
.block();
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
getMailQueue().enQueue(defaultMail()
.name(name2)
.build());
getMailQueue().enQueue(defaultMail()
.name(name3)
.build());
ConcurrentLinkedDeque<String> dequeuedMailNames = new ConcurrentLinkedDeque<>();
Flux.from(getMailQueue().deQueue())
.doOnNext(item -> dequeuedMailNames.add(item.getMail().getName()))
.flatMap(item -> Mono.fromRunnable(Throwing.runnable(() -> item.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS)))
.subscribeOn(Schedulers.fromExecutor(EXECUTOR)))
.subscribe();
await().atMost(TEN_SECONDS)
.untilAsserted(() -> assertThat(dequeuedMailNames)
.containsExactly(name1, name2, name3));
}
@Test
void manyInvalidMessagesShouldNotAbortProcessing() throws Exception {
String name1 = "myMail1";
String name2 = "myMail2";
String name3 = "myMail3";
String emptyRoutingKey = "";
IntStream.range(0, 100)
.forEach(i -> rabbitMQExtension.getSender()
.send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool",
emptyRoutingKey,
("BAD_PAYLOAD " + i).getBytes(StandardCharsets.UTF_8))))
.block());
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
getMailQueue().enQueue(defaultMail()
.name(name2)
.build());
getMailQueue().enQueue(defaultMail()
.name(name3)
.build());
ConcurrentLinkedDeque<String> dequeuedMailNames = new ConcurrentLinkedDeque<>();
Flux.from(getMailQueue().deQueue())
.doOnNext(item -> dequeuedMailNames.add(item.getMail().getName()))
.flatMap(item -> Mono.fromRunnable(Throwing.runnable(() -> item.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS)))
.subscribeOn(Schedulers.fromExecutor(EXECUTOR)))
.subscribe();
await().atMost(TEN_SECONDS)
.untilAsserted(() -> assertThat(dequeuedMailNames)
.containsExactly(name1, name2, name3));
}
@Test
void invalidMessagesShouldBeDeadLettered() {
String emptyRoutingKey = "";
rabbitMQExtension.getSender()
.send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool",
emptyRoutingKey,
"BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
.block();
AtomicInteger deadLetteredCount = new AtomicInteger();
rabbitMQExtension.getRabbitChannelPool()
.createReceiver()
.consumeAutoAck("JamesMailQueue-dead-letter-queue-spool")
.doOnNext(next -> deadLetteredCount.incrementAndGet())
.subscribeOn(Schedulers.fromExecutor(EXECUTOR))
.subscribe();
Flux.from(getMailQueue().deQueue())
.doOnNext(Throwing.consumer(item -> item.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS)))
.subscribeOn(Schedulers.fromExecutor(EXECUTOR))
.subscribe();
await().atMost(TEN_SECONDS)
.untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1));
}
@Test
void rejectedMessagesShouldBeDeadLettered() throws Exception {
String name1 = "myMail1";
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
getMailQueue().deQueue()
.flatMap(item -> Mono.fromRunnable(Throwing.runnable(() -> item.done(MailQueue.MailQueueItem.CompletionStatus.REJECT)))
.subscribeOn(Schedulers.boundedElastic())
.thenReturn(item))
.blockFirst();
AtomicInteger deadLetteredCount = new AtomicInteger();
rabbitMQExtension.getRabbitChannelPool()
.createReceiver()
.consumeAutoAck("JamesMailQueue-dead-letter-queue-spool")
.doOnNext(next -> deadLetteredCount.incrementAndGet())
.subscribeOn(Schedulers.fromExecutor(EXECUTOR))
.subscribe();
await().atMost(TEN_SECONDS)
.untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1));
}
private void resumeDequeuing(Sender sender) {
sender.bindQueue(getMailQueueBindingSpecification()).block();
}
private void suspendDequeuing(Sender sender) {
sender.unbindQueue(getMailQueueBindingSpecification()).block();
}
private BindingSpecification getMailQueueBindingSpecification() {
MailQueueName mailQueueName = MailQueueName.fromString(getMailQueue().getName().asString());
return BindingSpecification.binding()
.exchange(mailQueueName.toRabbitExchangeName().asString())
.queue(mailQueueName.toWorkQueueName().asString())
.routingKey(EMPTY_ROUTING_KEY);
}
}
@Nested
class MailQueueSizeMetricsDisabled {
@RegisterExtension
MailQueueMetricExtension mailQueueMetricExtension = new MailQueueMetricExtension();
@BeforeEach
void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
setUp(cassandra,
metricTestSystem,
RabbitMQMailQueueConfiguration.builder()
.sizeMetricsEnabled(false)
.build(),
CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
.passthrough(),
MailQueueFactory.prefetchCount(3));
}
@Test
void constructorShouldNotRegisterGetQueueSizeGaugeWhenSizeMetricsDisabled(MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
ArgumentCaptor<Gauge<?>> gaugeCaptor = ArgumentCaptor.forClass(Gauge.class);
verify(metricTestSystem.getSpyGaugeRegistry(), never()).register(any(), gaugeCaptor.capture());
}
}
@Nested
class DeDuplicationTest {
@RegisterExtension
MailQueueMetricExtension mailQueueMetricExtension = new MailQueueMetricExtension();
@BeforeEach
void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
setUp(cassandra,
metricTestSystem,
RabbitMQMailQueueConfiguration.builder()
.sizeMetricsEnabled(true)
.build(),
CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
.deduplication(),
MailQueueFactory.prefetchCount(3));
}
@Test
void dequeueShouldStillRetrieveAllBlobsWhenIdenticalContentAndDeduplication() throws Exception {
String identicalContent = "identical content";
String identicalSubject = "identical subject";
mailQueue.enQueue(defaultMail()
.name("myMail1")
.mimeMessage(MimeMessageBuilder.mimeMessageBuilder()
.setSubject(identicalSubject)
.setText(identicalContent))
.build());
mailQueue.enQueue(defaultMail()
.name("myMail2")
.mimeMessage(MimeMessageBuilder.mimeMessageBuilder()
.setSubject(identicalSubject)
.setText(identicalContent))
.build());
Flux.from(mailQueue.deQueue())
.take(2)
.concatMap(mailQueueItem ->
Mono.fromCallable(() -> {
assertThat(mailQueueItem.getMail().getMessage().getContent()).isEqualTo(identicalContent);
mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
return mailQueueItem;
}).subscribeOn(Schedulers.fromExecutor(EXECUTOR)))
.collectList()
.block(Duration.ofSeconds(10));
}
}
private void setUp(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem, RabbitMQMailQueueConfiguration configuration,
BlobStore blobStore, MailQueueFactory.PrefetchCount prefetchCount) throws Exception {
MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore);
clock = new UpdatableTickingClock(IN_SLICE_1);
MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(
clock,
cassandra.getConf(),
CassandraMailQueueViewConfiguration.builder()
.bucketCount(THREE_BUCKET_COUNT)
.updateBrowseStartPace(UPDATE_BROWSE_START_PACE)
.sliceWindow(ONE_HOUR_SLICE_WINDOW)
.build(),
mimeMessageStoreFactory
);
RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
metricTestSystem.getMetricFactory(),
metricTestSystem.getSpyGaugeRegistry(),
rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(),
mimeMessageStoreFactory,
BLOB_ID_FACTORY,
mailQueueViewFactory,
clock,
new RawMailQueueItemDecoratorFactory(),
configuration);
mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getSender(), mqManagementApi, factory, rabbitMQExtension.getRabbitMQ().getConfiguration());
mailQueue = mailQueueFactory.createQueue(SPOOL, prefetchCount);
}
}