blob: 62050e9ee792b5d5149b9b97a358560f87c3061e [file] [log] [blame]
= 31. Distributed Mail Queue
Date: 2020-04-13
== Status
Accepted (lazy consensus)
== Context
MailQueue is a central component of SMTP infrastructure allowing asynchronous mail processing.
This enables a short SMTP reply time despite a potentially longer mail processing time.
It also works as a buffer during SMTP peak workload to not overload a server.
Furthermore, when used as a Mail Exchange server (MX), the ability to add delays to be observed before dequeing elements allows, among others:
* Delaying retries upon MX delivery failure to a remote site.
* Throttling, which could be helpful for not being considered a spammer.
A mailqueue also enables advanced administration operations like traffic review, discarding emails, resetting wait delays, purging the queue, etc.
Spring implementation and non distributed implementations rely on an embedded ActiveMQ to implement the MailQueue.
Emails are being stored in a local file system.
An administrator wishing to administrate the mailQueue will thus need to interact with all its James servers, which is not friendly in a distributed setup.
Distributed James relies on the following third party softwares (among other):
* *RabbitMQ* for messaging.
Good at holding a queue, however some advanced administrative operations can't be implemented with this component alone.
This is the case for `browse`, `getSize` and `arbitrary mail removal`.
* *Cassandra* is the metadata database.
Due to *tombstone* being used for delete, queue is a well known anti-pattern.
* *ObjectStorage* (Swift or S3) holds byte content.
== Decision
Distributed James should ship a distributed MailQueue composing the following softwares with the following responsibilities:
* *RabbitMQ* for messaging.
A rabbitMQ consumer will trigger dequeue operations.
* A time series projection of the queue content (order by time list of mail metadata) will be maintained in *Cassandra* (see later).
Time series avoid the aforementioned tombstone anti-pattern, and no polling is performed on this projection.
* *ObjectStorage* (Swift or S3) holds large byte content.
This avoids overwhelming other softwares which do not scale as well in term of Input/Output operation per seconds.
Here are details of the tables composing Cassandra MailQueue View data-model:
* *enqueuedMailsV3* holds the time series.
The primary key holds the queue name, the (rounded) time of enqueue designed as a slice, and a bucketCount.
Slicing enables listing a large amount of items from a given point in time, in an fashion that is not achievable with a classic partition approach.
The bucketCount enables sharding and avoids all writes at a given point in time to go to the same Cassandra partition.
The clustering key is composed of an enqueueId - a unique identifier.
The content holds the metadata of the email.
This table enables, from a starting date, to load all of the emails that have ever been in the mailQueue.
Its content is never deleted.
* *deletedMailsV2* tells wether a mail stored in _enqueuedMailsV3_ had been deleted or not.
The queueName and enqueueId are used as primary key.
This table is updated upon dequeue and deletes.
This table is queried upon dequeue to filter out deleted/purged items.
* *browseStart* store the latest known point in time from which all previous emails had been deleted/dequeued.
It enables to skip most deleted items upon browsing/deleting queue content.
Its update is probability based and asynchronously piggy backed on dequeue.
Here are the main mail operation sequences:
* Upon *enqueue* mail content is stored in the _object storage_, an entry is added in _enqueuedMailsV3_ and a message is fired on _rabbitMQ_.
* *dequeue* is triggered by a rabbitMQ message to be received.
_deletedMailsV2_ is queried to know if the message had already been deleted.
If not, the mail content is retrieved from the _object storage_, then an entry is added in _deletedMailsV2_ to notice the email had been dequeued.
A dequeue has a random probability to trigger a browse start update.
If so, from current browse start, _enqueuedMailsV3_ content is iterated, and checked against _deletedMailsV2_ until the first non deleted / dequeued email is found.
This point becomes the new browse start.
BrowseStart can never point after the start of the current slice.
A grace period upon browse start update is left to tolerate clock skew.
Update of the browse start is done randomly as it is a simple way to avoid synchronisation in a distributed system: we ensure liveness while uneeded browseStart updates being triggered would simply waste a few resources.
* Upon *browse*, _enqueuedMailsV3_ content is iterated, and checked against _deletedMailsV2_, starting from the current browse start.
* Upon *delete/purge*, _enqueuedMailsV3_ content is iterated, and checked against _deletedMailsV2_.
Mails matching the condition are marked as deleted in _enqueuedMailsV3_.
* Upon *getSize*, we perform a browse and count the returned elements.
The distributed mail queue requires a fine tuned configuration, which mostly depends of the count of Cassandra servers, and of the mailQueue throughput:
* *sliceWindow* is the time period of a slice.
All the elements of *enqueuedMailsV3* sharing the same slice are retrieved at once.
The bigger, the more elements are going to be read at once, the less frequent browse start update will be.
Lower values might result in many almost empty slices to be read, generating higher read load.
We recommend *sliceWindow* to be chosen from users maximum throughput so that approximately 10.000 emails be contained in a slice.
Only values dividing the current _sliceWindow_ are allowed as new values (otherwize previous slices might not be found).
* *bucketCount* enables spreading the writes in your Cassandra cluster using a bucketting strategy.
Low values will lead to workload not to be spread evenly, higher values might result in uneeded reads upon browse.
The count of Cassandra servers should be a good starting value.
Only increasing the count of buckets is supported as a configuration update as decreasing the bucket count might result in some buckets to be lost.
* *updateBrowseStartPace* governs the probability of updating browseStart upon dequeue/deletes.
We recommend choosing a value guarantying a reasonable probability of updating the browse start every few slices.
Too big values will lead to uneeded update of not yet finished slices.
Too low values will end up in a more expensive browseStart update and browse iterating through slices with all their content deleted.
This value can be changed freely.
We rely on eventSourcing to validate the mailQueue configuration changes upon James start following the aforementioned rules.
== Limitations
Delays are not supported.
This mail queue implementation is thus not suited for a Mail Exchange (MX) implementation.
The https://issues.apache.org/jira/browse/JAMES-2896[following proposal] could be a solution to support delays.
*enqueuedMailsV3* and *deletedMailsV2* is never cleaned up and the corresponding blobs are always referenced.
This is not ideal both from a privacy and space storage costs point of view.
*getSize* operation is sub-optimal and thus not efficient.
Combined with metric reporting of mail queue size being periodically performed by all James servers this can lead, upon increasing throughput to a Cassandra overload.
A configuration parameter allows to disable mail queue size reporting as a temporary solution.
Some alternatives had been presented like https://github.com/linagora/james-project/pull/2565[an eventually consistent per slice counters approach].
An other proposed solution is https://github.com/linagora/james-project/pull/2325[to rely on RabbitMQ management API to retrieve mail queue size] however by design it cannot take into account purge/delete operations.
Read https://issues.apache.org/jira/browse/JAMES-2733[the corresponding JIRA].
== Consequences
Distributed mail queue allows a better spreading of Mail processing workload.
It enables a centralized mailQueue management for all James servers.
Yet some additional work is required to use it as a Mail Exchange scenario.