| 'use strict'; |
| |
| const EventEmitter = require('events'); |
| const packageData = require('../../package.json'); |
| const shared = require('../shared'); |
| const LeWindows = require('../sendmail-transport/le-windows'); |
| |
| /** |
| * Generates a Transport object for Sendmail |
| * |
| * Possible options can be the following: |
| * |
| * * **path** optional path to sendmail binary |
| * * **args** an array of arguments for the sendmail binary |
| * |
| * @constructor |
| * @param {Object} optional config parameter for the AWS Sendmail service |
| */ |
| class SESTransport extends EventEmitter { |
| constructor(options) { |
| super(); |
| options = options || {}; |
| |
| this.options = options || {}; |
| this.ses = this.options.SES; |
| |
| this.name = 'SESTransport'; |
| this.version = packageData.version; |
| |
| this.logger = shared.getLogger(this.options, { |
| component: this.options.component || 'ses-transport' |
| }); |
| |
| // parallel sending connections |
| this.maxConnections = Number(this.options.maxConnections) || Infinity; |
| this.connections = 0; |
| |
| // max messages per second |
| this.sendingRate = Number(this.options.sendingRate) || Infinity; |
| this.sendingRateTTL = null; |
| this.rateInterval = 1000; |
| this.rateMessages = []; |
| |
| this.pending = []; |
| |
| this.idling = true; |
| |
| setImmediate(() => { |
| if (this.idling) { |
| this.emit('idle'); |
| } |
| }); |
| } |
| |
| /** |
| * Schedules a sending of a message |
| * |
| * @param {Object} emailMessage MailComposer object |
| * @param {Function} callback Callback function to run when the sending is completed |
| */ |
| send(mail, callback) { |
| if (this.connections >= this.maxConnections) { |
| this.idling = false; |
| return this.pending.push({ |
| mail, |
| callback |
| }); |
| } |
| |
| if (!this._checkSendingRate()) { |
| this.idling = false; |
| return this.pending.push({ |
| mail, |
| callback |
| }); |
| } |
| |
| this._send(mail, (...args) => { |
| setImmediate(() => callback(...args)); |
| this._sent(); |
| }); |
| } |
| |
| _checkRatedQueue() { |
| if (this.connections >= this.maxConnections || !this._checkSendingRate()) { |
| return; |
| } |
| |
| if (!this.pending.length) { |
| if (!this.idling) { |
| this.idling = true; |
| this.emit('idle'); |
| } |
| return; |
| } |
| |
| let next = this.pending.shift(); |
| this._send(next.mail, (...args) => { |
| setImmediate(() => next.callback(...args)); |
| this._sent(); |
| }); |
| } |
| |
| _checkSendingRate() { |
| clearTimeout(this.sendingRateTTL); |
| |
| let now = Date.now(); |
| let oldest = false; |
| // delete older messages |
| for (let i = this.rateMessages.length - 1; i >= 0; i--) { |
| |
| if (this.rateMessages[i].ts >= now - this.rateInterval && (!oldest || this.rateMessages[i].ts < oldest)) { |
| oldest = this.rateMessages[i].ts; |
| } |
| |
| if (this.rateMessages[i].ts < now - this.rateInterval && !this.rateMessages[i].pending) { |
| this.rateMessages.splice(i, 1); |
| } |
| } |
| |
| if (this.rateMessages.length < this.sendingRate) { |
| return true; |
| } |
| |
| let delay = Math.max(oldest + 1001, now + 20); |
| this.sendingRateTTL = setTimeout(() => this._checkRatedQueue(), now - delay); |
| this.sendingRateTTL.unref(); |
| return false; |
| } |
| |
| _sent() { |
| this.connections--; |
| this._checkRatedQueue(); |
| } |
| |
| /** |
| * Returns true if there are free slots in the queue |
| */ |
| isIdle() { |
| return this.idling; |
| } |
| |
| /** |
| * Compiles a mailcomposer message and forwards it to SES |
| * |
| * @param {Object} emailMessage MailComposer object |
| * @param {Function} callback Callback function to run when the sending is completed |
| */ |
| _send(mail, callback) { |
| let statObject = { |
| ts: Date.now(), |
| pending: true |
| }; |
| this.connections++; |
| this.rateMessages.push(statObject); |
| |
| let envelope = mail.data.envelope || mail.message.getEnvelope(); |
| let messageId = mail.message.messageId(); |
| |
| let recipients = [].concat(envelope.to || []); |
| if (recipients.length > 3) { |
| recipients.push('...and ' + recipients.splice(2).length + ' more'); |
| } |
| this.logger.info({ |
| tnx: 'send', |
| messageId |
| }, 'Sending message %s to <%s>', messageId, recipients.join(', ')); |
| |
| let getRawMessage = next => { |
| |
| // do not use Message-ID and Date in DKIM signature |
| if (!mail.data._dkim) { |
| mail.data._dkim = {}; |
| } |
| if (mail.data._dkim.skipFields && typeof mail.data._dkim.skipFields === 'string') { |
| mail.data._dkim.skipFields += ':date:message-id'; |
| } else { |
| mail.data._dkim.skipFields = 'date:message-id'; |
| } |
| |
| let sourceStream = mail.message.createReadStream(); |
| let stream = sourceStream.pipe(new LeWindows()); |
| let chunks = []; |
| let chunklen = 0; |
| |
| stream.on('readable', () => { |
| let chunk; |
| while ((chunk = stream.read()) !== null) { |
| chunks.push(chunk); |
| chunklen += chunk.length; |
| } |
| }); |
| |
| sourceStream.once('error', err => stream.emit('error', err)); |
| |
| stream.once('error', err => { |
| next(err); |
| }); |
| |
| stream.once('end', () => next(null, Buffer.concat(chunks, chunklen))); |
| }; |
| |
| setImmediate(() => getRawMessage((err, raw) => { |
| if (err) { |
| this.logger.error({ |
| err, |
| tnx: 'send', |
| messageId |
| }, 'Failed creating message for %s. %s', messageId, err.message); |
| statObject.pending = false; |
| return callback(err); |
| } |
| |
| let sesMessage = { |
| RawMessage: { // required |
| Data: raw // required |
| }, |
| Source: envelope.from, |
| Destinations: envelope.to |
| }; |
| |
| Object.keys(mail.data.ses || {}).forEach(key => { |
| sesMessage[key] = mail.data.ses[key]; |
| }); |
| |
| this.ses.sendRawEmail(sesMessage, (err, data) => { |
| if (err) { |
| this.logger.error({ |
| err, |
| tnx: 'send' |
| }, 'Send error for %s: %s', messageId, err.message); |
| statObject.pending = false; |
| return callback(err); |
| } |
| |
| let region = this.ses.config && this.ses.config.region || 'us-east-1'; |
| if (region === 'us-east-1') { |
| region = 'email'; |
| } |
| |
| statObject.pending = false; |
| callback(null, { |
| envelope: { |
| from: envelope.from, |
| to: envelope.to |
| }, |
| messageId: '<' + data.MessageId + (!/@/.test(data.MessageId) ? '@' + region + '.amazonses.com' : '') + '>', |
| response: data.MessageId |
| }); |
| }); |
| })); |
| } |
| |
| /** |
| * Verifies SES configuration |
| * |
| * @param {Function} callback Callback function |
| */ |
| verify(callback) { |
| let promise; |
| |
| if (!callback && typeof Promise === 'function') { |
| promise = new Promise((resolve, reject) => { |
| callback = shared.callbackPromise(resolve, reject); |
| }); |
| } |
| |
| this.ses.sendRawEmail({ |
| RawMessage: { // required |
| Data: 'From: invalid@invalid\r\nTo: invalid@invalid\r\n Subject: Invalid\r\n\r\nInvalid' |
| }, |
| Source: 'invalid@invalid', |
| Destinations: ['invalid@invalid'] |
| }, err => { |
| if (err && err.code !== 'InvalidParameterValue') { |
| return callback(err); |
| } |
| return callback(null, true); |
| }); |
| |
| return promise; |
| } |
| } |
| |
| module.exports = SESTransport; |