blob: 79faceeaf9e4e93c82ffb024541fa4d00b636c87 [file] [log] [blame]
'use strict';
const EventEmitter = require('events');
const packageData = require('../../package.json');
const shared = require('../shared');
const LeWindows = require('../sendmail-transport/le-windows');
/**
* Generates a Transport object for Sendmail
*
* Possible options can be the following:
*
* * **path** optional path to sendmail binary
* * **args** an array of arguments for the sendmail binary
*
* @constructor
* @param {Object} optional config parameter for the AWS Sendmail service
*/
class SESTransport extends EventEmitter {
constructor(options) {
super();
options = options || {};
this.options = options || {};
this.ses = this.options.SES;
this.name = 'SESTransport';
this.version = packageData.version;
this.logger = shared.getLogger(this.options, {
component: this.options.component || 'ses-transport'
});
// parallel sending connections
this.maxConnections = Number(this.options.maxConnections) || Infinity;
this.connections = 0;
// max messages per second
this.sendingRate = Number(this.options.sendingRate) || Infinity;
this.sendingRateTTL = null;
this.rateInterval = 1000;
this.rateMessages = [];
this.pending = [];
this.idling = true;
setImmediate(() => {
if (this.idling) {
this.emit('idle');
}
});
}
/**
* Schedules a sending of a message
*
* @param {Object} emailMessage MailComposer object
* @param {Function} callback Callback function to run when the sending is completed
*/
send(mail, callback) {
if (this.connections >= this.maxConnections) {
this.idling = false;
return this.pending.push({
mail,
callback
});
}
if (!this._checkSendingRate()) {
this.idling = false;
return this.pending.push({
mail,
callback
});
}
this._send(mail, (...args) => {
setImmediate(() => callback(...args));
this._sent();
});
}
_checkRatedQueue() {
if (this.connections >= this.maxConnections || !this._checkSendingRate()) {
return;
}
if (!this.pending.length) {
if (!this.idling) {
this.idling = true;
this.emit('idle');
}
return;
}
let next = this.pending.shift();
this._send(next.mail, (...args) => {
setImmediate(() => next.callback(...args));
this._sent();
});
}
_checkSendingRate() {
clearTimeout(this.sendingRateTTL);
let now = Date.now();
let oldest = false;
// delete older messages
for (let i = this.rateMessages.length - 1; i >= 0; i--) {
if (this.rateMessages[i].ts >= now - this.rateInterval && (!oldest || this.rateMessages[i].ts < oldest)) {
oldest = this.rateMessages[i].ts;
}
if (this.rateMessages[i].ts < now - this.rateInterval && !this.rateMessages[i].pending) {
this.rateMessages.splice(i, 1);
}
}
if (this.rateMessages.length < this.sendingRate) {
return true;
}
let delay = Math.max(oldest + 1001, now + 20);
this.sendingRateTTL = setTimeout(() => this._checkRatedQueue(), now - delay);
this.sendingRateTTL.unref();
return false;
}
_sent() {
this.connections--;
this._checkRatedQueue();
}
/**
* Returns true if there are free slots in the queue
*/
isIdle() {
return this.idling;
}
/**
* Compiles a mailcomposer message and forwards it to SES
*
* @param {Object} emailMessage MailComposer object
* @param {Function} callback Callback function to run when the sending is completed
*/
_send(mail, callback) {
let statObject = {
ts: Date.now(),
pending: true
};
this.connections++;
this.rateMessages.push(statObject);
let envelope = mail.data.envelope || mail.message.getEnvelope();
let messageId = mail.message.messageId();
let recipients = [].concat(envelope.to || []);
if (recipients.length > 3) {
recipients.push('...and ' + recipients.splice(2).length + ' more');
}
this.logger.info({
tnx: 'send',
messageId
}, 'Sending message %s to <%s>', messageId, recipients.join(', '));
let getRawMessage = next => {
// do not use Message-ID and Date in DKIM signature
if (!mail.data._dkim) {
mail.data._dkim = {};
}
if (mail.data._dkim.skipFields && typeof mail.data._dkim.skipFields === 'string') {
mail.data._dkim.skipFields += ':date:message-id';
} else {
mail.data._dkim.skipFields = 'date:message-id';
}
let sourceStream = mail.message.createReadStream();
let stream = sourceStream.pipe(new LeWindows());
let chunks = [];
let chunklen = 0;
stream.on('readable', () => {
let chunk;
while ((chunk = stream.read()) !== null) {
chunks.push(chunk);
chunklen += chunk.length;
}
});
sourceStream.once('error', err => stream.emit('error', err));
stream.once('error', err => {
next(err);
});
stream.once('end', () => next(null, Buffer.concat(chunks, chunklen)));
};
setImmediate(() => getRawMessage((err, raw) => {
if (err) {
this.logger.error({
err,
tnx: 'send',
messageId
}, 'Failed creating message for %s. %s', messageId, err.message);
statObject.pending = false;
return callback(err);
}
let sesMessage = {
RawMessage: { // required
Data: raw // required
},
Source: envelope.from,
Destinations: envelope.to
};
Object.keys(mail.data.ses || {}).forEach(key => {
sesMessage[key] = mail.data.ses[key];
});
this.ses.sendRawEmail(sesMessage, (err, data) => {
if (err) {
this.logger.error({
err,
tnx: 'send'
}, 'Send error for %s: %s', messageId, err.message);
statObject.pending = false;
return callback(err);
}
let region = this.ses.config && this.ses.config.region || 'us-east-1';
if (region === 'us-east-1') {
region = 'email';
}
statObject.pending = false;
callback(null, {
envelope: {
from: envelope.from,
to: envelope.to
},
messageId: '<' + data.MessageId + (!/@/.test(data.MessageId) ? '@' + region + '.amazonses.com' : '') + '>',
response: data.MessageId
});
});
}));
}
/**
* Verifies SES configuration
*
* @param {Function} callback Callback function
*/
verify(callback) {
let promise;
if (!callback && typeof Promise === 'function') {
promise = new Promise((resolve, reject) => {
callback = shared.callbackPromise(resolve, reject);
});
}
this.ses.sendRawEmail({
RawMessage: { // required
Data: 'From: invalid@invalid\r\nTo: invalid@invalid\r\n Subject: Invalid\r\n\r\nInvalid'
},
Source: 'invalid@invalid',
Destinations: ['invalid@invalid']
}, err => {
if (err && err.code !== 'InvalidParameterValue') {
return callback(err);
}
return callback(null, true);
});
return promise;
}
}
module.exports = SESTransport;