| # 31. Distributed Mail Queue |
| |
| Date: 2020-04-13 |
| |
| ## Status |
| |
| Accepted (lazy consensus) & implemented |
| |
| ## Context |
| |
| MailQueue is a central component of SMTP infrastructure allowing asynchronous mail processing. This enables a short |
| SMTP reply time despite a potentially longer mail processing time. It also works as a buffer during SMTP peak workload |
| to not overload a server. |
| |
| Furthermore, when used as a Mail Exchange server (MX), the ability to add delays to be observed before dequeing elements |
| allows, among others: |
| |
| - Delaying retries upon MX delivery failure to a remote site. |
| - Throttling, which could be helpful for not being considered a spammer. |
| |
| A mailqueue also enables advanced administration operations like traffic review, discarding emails, resetting wait |
| delays, purging the queue, etc. |
| |
| Spring implementation and non distributed implementations rely on an embedded ActiveMQ to implement the MailQueue. |
| Emails are being stored in a local file system. An administrator wishing to administrate the mailQueue will thus need |
| to interact with all its James servers, which is not friendly in a distributed setup. |
| |
| Distributed James relies on the following third party softwares (among other): |
| |
| - **RabbitMQ** for messaging. Good at holding a queue, however some advanced administrative operations can't be |
| implemented with this component alone. This is the case for `browse`, `getSize` and `arbitrary mail removal`. |
| - **Cassandra** is the metadata database. Due to **tombstone** being used for delete, queue is a well known anti-pattern. |
| - **ObjectStorage** (Swift or S3) holds byte content. |
| |
| ## Decision |
| |
| Distributed James should ship a distributed MailQueue composing the following softwares with the following |
| responsibilities: |
| |
| - **RabbitMQ** for messaging. A rabbitMQ consumer will trigger dequeue operations. |
| - A time series projection of the queue content (order by time list of mail metadata) will be maintained in **Cassandra** (see later). Time series avoid the |
| aforementioned tombstone anti-pattern, and no polling is performed on this projection. |
| - **ObjectStorage** (Swift or S3) holds large byte content. This avoids overwhelming other softwares which do not scale |
| as well in term of Input/Output operation per seconds. |
| |
| Here are details of the tables composing Cassandra MailQueue View data-model: |
| |
| - **enqueuedMailsV3** holds the time series. The primary key holds the queue name, the (rounded) time of enqueue |
| designed as a slice, and a bucketCount. Slicing enables listing a large amount of items from a given point in time, in an |
| fashion that is not achievable with a classic partition approach. The bucketCount enables sharding and avoids all writes |
| at a given point in time to go to the same Cassandra partition. The clustering key is composed of an enqueueId - a |
| unique identifier. The content holds the metadata of the email. This table enables, from a starting date, to load all of |
| the emails that have ever been in the mailQueue. Its content is never deleted. |
| - **deletedMailsV2** tells wether a mail stored in *enqueuedMailsV3* had been deleted or not. The queueName and |
| enqueueId are used as primary key. This table is updated upon dequeue and deletes. This table is queried upon dequeue |
| to filter out deleted/purged items. |
| - **browseStart** store the latest known point in time from which all previous emails had been deleted/dequeued. It |
| enables to skip most deleted items upon browsing/deleting queue content. Its update is probability based and |
| asynchronously piggy backed on dequeue. |
| |
| Here are the main mail operation sequences: |
| |
| - Upon **enqueue** mail content is stored in the *object storage*, an entry is added in *enqueuedMailsV3* and a message |
| is fired on *rabbitMQ*. |
| - **dequeue** is triggered by a rabbitMQ message to be received. *deletedMailsV2* is queried to know if the message had |
| already been deleted. If not, the mail content is retrieved from the *object storage*, then an entry is added in |
| *deletedMailsV2* to notice the email had been dequeued. A dequeue has a random probability to trigger a browse start |
| update. If so, from current browse start, *enqueuedMailsV3* content is iterated, and checked against *deletedMailsV2* |
| until the first non deleted / dequeued email is found. This point becomes the new browse start. BrowseStart can never |
| point after the start of the current slice. A grace period upon browse start update is left to tolerate clock skew. |
| Update of the browse start is done randomly as it is a simple way to avoid synchronisation in a distributed system: we |
| ensure liveness while uneeded browseStart updates being triggered would simply waste a few resources. |
| - Upon **browse**, *enqueuedMailsV3* content is iterated, and checked against *deletedMailsV2*, starting from the |
| current browse start. |
| - Upon **delete/purge**, *enqueuedMailsV3* content is iterated, and checked against *deletedMailsV2*. Mails matching |
| the condition are marked as deleted in *enqueuedMailsV3*. |
| - Upon **getSize**, we perform a browse and count the returned elements. |
| |
| The distributed mail queue requires a fine tuned configuration, which mostly depends of the count of Cassandra servers, |
| and of the mailQueue throughput: |
| - **sliceWindow** is the time period of a slice. All the elements of **enqueuedMailsV3** sharing the same slice are |
| retrieved at once. The bigger, the more elements are going to be read at once, the less frequent browse start update |
| will be. Lower values might result in many almost empty slices to be read, generating higher read load. We recommend |
| **sliceWindow** to be chosen from users maximum throughput so that approximately 10.000 emails be contained in a slice. |
| Only values dividing the current *sliceWindow* are allowed as new values (otherwize previous slices might not be found). |
| - **bucketCount** enables spreading the writes in your Cassandra cluster using a bucketting strategy. Low values will |
| lead to workload not to be spread evenly, higher values might result in uneeded reads upon browse. The count of Cassandra |
| servers should be a good starting value. Only increasing the count of buckets is supported as a configuration update as |
| decreasing the bucket count might result in some buckets to be lost. |
| - **updateBrowseStartPace** governs the probability of updating browseStart upon dequeue/deletes. We recommend choosing |
| a value guarantying a reasonable probability of updating the browse start every few slices. Too big values will lead to |
| uneeded update of not yet finished slices. Too low values will end up in a more expensive browseStart update and browse |
| iterating through slices with all their content deleted. This value can be changed freely. |
| |
| We rely on eventSourcing to validate the mailQueue configuration changes upon James start following the aforementioned rules. |
| |
| ## Limitations |
| |
| Delays are not supported. This mail queue implementation is thus not suited for a Mail Exchange (MX) implementation. |
| The [following proposal](https://issues.apache.org/jira/browse/JAMES-2896) could be a solution to support delays. |
| |
| **enqueuedMailsV3** and **deletedMailsV2** is never cleaned up and the corresponding blobs are always referenced. This is not |
| ideal both from a privacy and space storage costs point of view. |
| |
| **getSize** operation is sub-optimal and thus not efficient. Combined with metric reporting of mail queue size being |
| periodically performed by all James servers this can lead, upon increasing throughput to a Cassandra overload. A configuration |
| parameter allows to disable mail queue size reporting as a temporary solution. Some alternatives had been presented like |
| [an eventually consistent per slice counters approach](https://github.com/linagora/james-project/pull/2565). An other |
| proposed solution is [to rely on RabbitMQ management API to retrieve mail queue size](https://github.com/linagora/james-project/pull/2325) |
| however by design it cannot take into account purge/delete operations. Read |
| [the corresponding JIRA](https://issues.apache.org/jira/browse/JAMES-2733). |
| |
| ## Consequences |
| |
| Distributed mail queue allows a better spreading of Mail processing workload. It enables a centralized mailQueue |
| management for all James servers. |
| |
| Yet some additional work is required to use it as a Mail Exchange scenario. |
| |
| ## References |
| |
| * [JIRA](https://issues.apache.org/jira/browse/JAMES-2541) |