blob: 8e3155eea9c76620255f2d60978a97a159b94599 [file] [log] [blame]
'use strict';
const EventEmitter = require('events');
const PoolResource = require('./pool-resource');
const SMTPConnection = require('../smtp-connection');
const wellKnown = require('../well-known');
const shared = require('../shared');
const packageData = require('../../package.json');
/**
* Creates a SMTP pool transport object for Nodemailer
*
* @constructor
* @param {Object} options SMTP Connection options
*/
class SMTPPool extends EventEmitter {
constructor(options) {
super();
options = options || {};
if (typeof options === 'string') {
options = {
url: options
};
}
let urlData;
let service = options.service;
if (typeof options.getSocket === 'function') {
this.getSocket = options.getSocket;
}
if (options.url) {
urlData = shared.parseConnectionUrl(options.url);
service = service || urlData.service;
}
this.options = shared.assign(
false, // create new object
options, // regular options
urlData, // url options
service && wellKnown(service) // wellknown options
);
this.options.maxConnections = this.options.maxConnections || 5;
this.options.maxMessages = this.options.maxMessages || 100;
this.logger = shared.getLogger(this.options, {
component: this.options.component || 'smtp-pool'
});
// temporary object
let connection = new SMTPConnection(this.options);
this.name = 'SMTP (pool)';
this.version = packageData.version + '[client:' + connection.version + ']';
this._rateLimit = {
counter: 0,
timeout: null,
waiting: [],
checkpoint: false,
delta: Number(this.options.rateDelta) || 1000,
limit: Number(this.options.rateLimit) || 0
};
this._closed = false;
this._queue = [];
this._connections = [];
this._connectionCounter = 0;
this.idling = true;
setImmediate(() => {
if (this.idling) {
this.emit('idle');
}
});
}
/**
* Placeholder function for creating proxy sockets. This method immediatelly returns
* without a socket
*
* @param {Object} options Connection options
* @param {Function} callback Callback function to run with the socket keys
*/
getSocket(options, callback) {
// return immediatelly
return setImmediate(() => callback(null, false));
}
/**
* Queues an e-mail to be sent using the selected settings
*
* @param {Object} mail Mail object
* @param {Function} callback Callback function
*/
send(mail, callback) {
if (this._closed) {
return false;
}
this._queue.push({
mail,
callback
});
if (this.idling && this._queue.length >= this.options.maxConnections) {
this.idling = false;
}
setImmediate(() => this._processMessages());
return true;
}
/**
* Closes all connections in the pool. If there is a message being sent, the connection
* is closed later
*/
close() {
let connection;
let len = this._connections.length;
this._closed = true;
// clear rate limit timer if it exists
clearTimeout(this._rateLimit.timeout);
if (!len && !this._queue.length) {
return;
}
// remove all available connections
for (let i = len - 1; i >= 0; i--) {
if (this._connections[i] && this._connections[i].available) {
connection = this._connections[i];
connection.close();
this.logger.info({
tnx: 'connection',
cid: connection.id,
action: 'removed'
}, 'Connection #%s removed', connection.id);
}
}
if (len && !this._connections.length) {
this.logger.debug({
tnx: 'connection'
}, 'All connections removed');
}
if (!this._queue.length) {
return;
}
// make sure that entire queue would be cleaned
let invokeCallbacks = () => {
if (!this._queue.length) {
this.logger.debug({
tnx: 'connection'
}, 'Pending queue entries cleared');
return;
}
let entry = this._queue.shift();
if (entry && typeof entry.callback === 'function') {
try {
entry.callback(new Error('Connection pool was closed'));
} catch (E) {
this.logger.error({
err: E,
tnx: 'callback',
cid: connection.id
}, 'Callback error for #%s: %s', connection.id, E.message);
}
}
setImmediate(invokeCallbacks);
};
setImmediate(invokeCallbacks);
}
/**
* Check the queue and available connections. If there is a message to be sent and there is
* an available connection, then use this connection to send the mail
*/
_processMessages() {
let connection;
let i, len;
// do nothing if already closed
if (this._closed) {
return;
}
// do nothing if queue is empty
if (!this._queue.length) {
if (!this.idling) {
// no pending jobs
this.idling = true;
this.emit('idle');
}
return;
}
// find first available connection
for (i = 0, len = this._connections.length; i < len; i++) {
if (this._connections[i].available) {
connection = this._connections[i];
break;
}
}
if (!connection && this._connections.length < this.options.maxConnections) {
connection = this._createConnection();
}
if (!connection) {
// no more free connection slots available
this.idling = false;
return;
}
// check if there is free space in the processing queue
if (!this.idling && this._queue.length < this.options.maxConnections) {
this.idling = true;
this.emit('idle');
}
let entry = connection.queueEntry = this._queue.shift();
entry.messageId = (connection.queueEntry.mail.message.getHeader('message-id') || '').replace(/[<>\s]/g, '');
connection.available = false;
this.logger.debug({
tnx: 'pool',
cid: connection.id,
messageId: entry.messageId,
action: 'assign'
}, 'Assigned message <%s> to #%s (%s)', entry.messageId, connection.id, connection.messages + 1);
if (this._rateLimit.limit) {
this._rateLimit.counter++;
if (!this._rateLimit.checkpoint) {
this._rateLimit.checkpoint = Date.now();
}
}
connection.send(entry.mail, (err, info) => {
// only process callback if current handler is not changed
if (entry === connection.queueEntry) {
try {
entry.callback(err, info);
} catch (E) {
this.logger.error({
err: E,
tnx: 'callback',
cid: connection.id
}, 'Callback error for #%s: %s', connection.id, E.message);
}
connection.queueEntry = false;
}
});
}
/**
* Creates a new pool resource
*/
_createConnection() {
let connection = new PoolResource(this);
connection.id = ++this._connectionCounter;
this.logger.info({
tnx: 'pool',
cid: connection.id,
action: 'conection'
}, 'Created new pool resource #%s', connection.id);
// resource comes available
connection.on('available', () => {
this.logger.debug({
tnx: 'connection',
cid: connection.id,
action: 'available'
}, 'Connection #%s became available', connection.id);
if (this._closed) {
// if already closed run close() that will remove this connections from connections list
this.close();
} else {
// check if there's anything else to send
this._processMessages();
}
});
// resource is terminated with an error
connection.once('error', err => {
if (err.code !== 'EMAXLIMIT') {
this.logger.error({
err,
tnx: 'pool',
cid: connection.id
}, 'Pool Error for #%s: %s', connection.id, err.message);
} else {
this.logger.debug({
tnx: 'pool',
cid: connection.id,
action: 'maxlimit'
}, 'Max messages limit exchausted for #%s', connection.id);
}
if (connection.queueEntry) {
try {
connection.queueEntry.callback(err);
} catch (E) {
this.logger.error({
err: E,
tnx: 'callback',
cid: connection.id
}, 'Callback error for #%s: %s', connection.id, E.message);
}
connection.queueEntry = false;
}
// remove the erroneus connection from connections list
this._removeConnection(connection);
this._continueProcessing();
});
connection.once('close', () => {
this.logger.info({
tnx: 'connection',
cid: connection.id,
action: 'closed'
}, 'Connection #%s was closed', connection.id);
this._removeConnection(connection);
if (connection.queueEntry) {
// If the connection closed when sending, add the message to the queue again
// Note that we must wait a bit.. because the callback of the 'error' handler might be called
// in the next event loop
setTimeout(() => {
if (connection.queueEntry) {
this.logger.debug({
tnx: 'pool',
cid: connection.id,
messageId: connection.queueEntry.messageId,
action: 'requeue'
}, 'Re-queued message <%s> for #%s', connection.queueEntry.messageId, connection.id);
this._queue.unshift(connection.queueEntry);
connection.queueEntry = false;
}
this._continueProcessing();
}, 50);
} else {
this._continueProcessing();
}
});
this._connections.push(connection);
return connection;
}
/**
* Continue to process message if the pool hasn't closed
*/
_continueProcessing() {
if (this._closed) {
this.close();
} else {
setTimeout(() => this._processMessages(), 100);
}
}
/**
* Remove resource from pool
*
* @param {Object} connection The PoolResource to remove
*/
_removeConnection(connection) {
let index = this._connections.indexOf(connection);
if (index !== -1) {
this._connections.splice(index, 1);
}
}
/**
* Checks if connections have hit current rate limit and if so, queues the availability callback
*
* @param {Function} callback Callback function to run once rate limiter has been cleared
*/
_checkRateLimit(callback) {
if (!this._rateLimit.limit) {
return callback();
}
let now = Date.now();
if (this._rateLimit.counter < this._rateLimit.limit) {
return callback();
}
this._rateLimit.waiting.push(callback);
if (this._rateLimit.checkpoint <= now - this._rateLimit.delta) {
return this._clearRateLimit();
} else if (!this._rateLimit.timeout) {
this._rateLimit.timeout = setTimeout(() => this._clearRateLimit(), this._rateLimit.delta - (now - this._rateLimit.checkpoint));
this._rateLimit.checkpoint = now;
}
}
/**
* Clears current rate limit limitation and runs paused callback
*/
_clearRateLimit() {
clearTimeout(this._rateLimit.timeout);
this._rateLimit.timeout = null;
this._rateLimit.counter = 0;
this._rateLimit.checkpoint = false;
// resume all paused connections
while (this._rateLimit.waiting.length) {
let cb = this._rateLimit.waiting.shift();
setImmediate(cb);
}
}
/**
* Returns true if there are free slots in the queue
*/
isIdle() {
return this.idling;
}
/**
* Verifies SMTP configuration
*
* @param {Function} callback Callback function
*/
verify(callback) {
let promise;
if (!callback && typeof Promise === 'function') {
promise = new Promise((resolve, reject) => {
callback = shared.callbackPromise(resolve, reject);
});
}
let auth = new PoolResource(this).auth;
this.getSocket(this.options, (err, socketOptions) => {
if (err) {
return callback(err);
}
let options = this.options;
if (socketOptions && socketOptions.connection) {
this.logger.info({
tnx: 'proxy',
remoteAddress: socketOptions.connection.remoteAddress,
remotePort: socketOptions.connection.remotePort,
destHost: options.host || '',
destPort: options.port || '',
action: 'connected'
}, 'Using proxied socket from %s:%s to %s:%s', socketOptions.connection.remoteAddress, socketOptions.connection.remotePort, options.host || '', options.port || '');
options = shared.assign(false, options);
Object.keys(socketOptions).forEach(key => {
options[key] = socketOptions[key];
});
}
let connection = new SMTPConnection(options);
let returned = false;
connection.once('error', err => {
if (returned) {
return;
}
returned = true;
connection.close();
return callback(err);
});
connection.once('end', () => {
if (returned) {
return;
}
returned = true;
return callback(new Error('Connection closed'));
});
let finalize = () => {
if (returned) {
return;
}
returned = true;
connection.quit();
return callback(null, true);
};
connection.connect(() => {
if (returned) {
return;
}
if (auth) {
connection.login(auth, err => {
if (returned) {
return;
}
if (err) {
returned = true;
connection.close();
return callback(err);
}
finalize();
});
} else {
finalize();
}
});
});
return promise;
}
}
// expose to the world
module.exports = SMTPPool;