| 'use strict'; |
| |
| const EventEmitter = require('events'); |
| const PoolResource = require('./pool-resource'); |
| const SMTPConnection = require('../smtp-connection'); |
| const wellKnown = require('../well-known'); |
| const shared = require('../shared'); |
| const packageData = require('../../package.json'); |
| |
| /** |
| * Creates a SMTP pool transport object for Nodemailer |
| * |
| * @constructor |
| * @param {Object} options SMTP Connection options |
| */ |
| class SMTPPool extends EventEmitter { |
| constructor(options) { |
| super(); |
| |
| options = options || {}; |
| if (typeof options === 'string') { |
| options = { |
| url: options |
| }; |
| } |
| |
| let urlData; |
| let service = options.service; |
| |
| if (typeof options.getSocket === 'function') { |
| this.getSocket = options.getSocket; |
| } |
| |
| if (options.url) { |
| urlData = shared.parseConnectionUrl(options.url); |
| service = service || urlData.service; |
| } |
| |
| this.options = shared.assign( |
| false, // create new object |
| options, // regular options |
| urlData, // url options |
| service && wellKnown(service) // wellknown options |
| ); |
| |
| this.options.maxConnections = this.options.maxConnections || 5; |
| this.options.maxMessages = this.options.maxMessages || 100; |
| |
| this.logger = shared.getLogger(this.options, { |
| component: this.options.component || 'smtp-pool' |
| }); |
| |
| // temporary object |
| let connection = new SMTPConnection(this.options); |
| |
| this.name = 'SMTP (pool)'; |
| this.version = packageData.version + '[client:' + connection.version + ']'; |
| |
| this._rateLimit = { |
| counter: 0, |
| timeout: null, |
| waiting: [], |
| checkpoint: false, |
| delta: Number(this.options.rateDelta) || 1000, |
| limit: Number(this.options.rateLimit) || 0 |
| }; |
| this._closed = false; |
| this._queue = []; |
| this._connections = []; |
| this._connectionCounter = 0; |
| |
| this.idling = true; |
| |
| setImmediate(() => { |
| if (this.idling) { |
| this.emit('idle'); |
| } |
| }); |
| } |
| |
| /** |
| * Placeholder function for creating proxy sockets. This method immediatelly returns |
| * without a socket |
| * |
| * @param {Object} options Connection options |
| * @param {Function} callback Callback function to run with the socket keys |
| */ |
| getSocket(options, callback) { |
| // return immediatelly |
| return setImmediate(() => callback(null, false)); |
| } |
| |
| /** |
| * Queues an e-mail to be sent using the selected settings |
| * |
| * @param {Object} mail Mail object |
| * @param {Function} callback Callback function |
| */ |
| send(mail, callback) { |
| if (this._closed) { |
| return false; |
| } |
| |
| this._queue.push({ |
| mail, |
| callback |
| }); |
| |
| if (this.idling && this._queue.length >= this.options.maxConnections) { |
| this.idling = false; |
| } |
| |
| setImmediate(() => this._processMessages()); |
| |
| return true; |
| } |
| |
| /** |
| * Closes all connections in the pool. If there is a message being sent, the connection |
| * is closed later |
| */ |
| close() { |
| let connection; |
| let len = this._connections.length; |
| this._closed = true; |
| |
| // clear rate limit timer if it exists |
| clearTimeout(this._rateLimit.timeout); |
| |
| if (!len && !this._queue.length) { |
| return; |
| } |
| |
| // remove all available connections |
| for (let i = len - 1; i >= 0; i--) { |
| if (this._connections[i] && this._connections[i].available) { |
| connection = this._connections[i]; |
| connection.close(); |
| this.logger.info({ |
| tnx: 'connection', |
| cid: connection.id, |
| action: 'removed' |
| }, 'Connection #%s removed', connection.id); |
| } |
| } |
| |
| if (len && !this._connections.length) { |
| this.logger.debug({ |
| tnx: 'connection' |
| }, 'All connections removed'); |
| } |
| |
| if (!this._queue.length) { |
| return; |
| } |
| |
| // make sure that entire queue would be cleaned |
| let invokeCallbacks = () => { |
| if (!this._queue.length) { |
| this.logger.debug({ |
| tnx: 'connection' |
| }, 'Pending queue entries cleared'); |
| return; |
| } |
| let entry = this._queue.shift(); |
| if (entry && typeof entry.callback === 'function') { |
| try { |
| entry.callback(new Error('Connection pool was closed')); |
| } catch (E) { |
| this.logger.error({ |
| err: E, |
| tnx: 'callback', |
| cid: connection.id |
| }, 'Callback error for #%s: %s', connection.id, E.message); |
| } |
| } |
| setImmediate(invokeCallbacks); |
| }; |
| setImmediate(invokeCallbacks); |
| } |
| |
| /** |
| * Check the queue and available connections. If there is a message to be sent and there is |
| * an available connection, then use this connection to send the mail |
| */ |
| _processMessages() { |
| let connection; |
| let i, len; |
| |
| // do nothing if already closed |
| if (this._closed) { |
| return; |
| } |
| |
| // do nothing if queue is empty |
| if (!this._queue.length) { |
| if (!this.idling) { |
| // no pending jobs |
| this.idling = true; |
| this.emit('idle'); |
| } |
| return; |
| } |
| |
| // find first available connection |
| for (i = 0, len = this._connections.length; i < len; i++) { |
| if (this._connections[i].available) { |
| connection = this._connections[i]; |
| break; |
| } |
| } |
| |
| if (!connection && this._connections.length < this.options.maxConnections) { |
| connection = this._createConnection(); |
| } |
| |
| if (!connection) { |
| // no more free connection slots available |
| this.idling = false; |
| return; |
| } |
| |
| // check if there is free space in the processing queue |
| if (!this.idling && this._queue.length < this.options.maxConnections) { |
| this.idling = true; |
| this.emit('idle'); |
| } |
| |
| let entry = connection.queueEntry = this._queue.shift(); |
| entry.messageId = (connection.queueEntry.mail.message.getHeader('message-id') || '').replace(/[<>\s]/g, ''); |
| |
| connection.available = false; |
| |
| this.logger.debug({ |
| tnx: 'pool', |
| cid: connection.id, |
| messageId: entry.messageId, |
| action: 'assign' |
| }, 'Assigned message <%s> to #%s (%s)', entry.messageId, connection.id, connection.messages + 1); |
| |
| if (this._rateLimit.limit) { |
| this._rateLimit.counter++; |
| if (!this._rateLimit.checkpoint) { |
| this._rateLimit.checkpoint = Date.now(); |
| } |
| } |
| |
| connection.send(entry.mail, (err, info) => { |
| // only process callback if current handler is not changed |
| if (entry === connection.queueEntry) { |
| try { |
| entry.callback(err, info); |
| } catch (E) { |
| this.logger.error({ |
| err: E, |
| tnx: 'callback', |
| cid: connection.id |
| }, 'Callback error for #%s: %s', connection.id, E.message); |
| } |
| connection.queueEntry = false; |
| } |
| }); |
| } |
| |
| /** |
| * Creates a new pool resource |
| */ |
| _createConnection() { |
| let connection = new PoolResource(this); |
| |
| connection.id = ++this._connectionCounter; |
| |
| this.logger.info({ |
| tnx: 'pool', |
| cid: connection.id, |
| action: 'conection' |
| }, 'Created new pool resource #%s', connection.id); |
| |
| // resource comes available |
| connection.on('available', () => { |
| this.logger.debug({ |
| tnx: 'connection', |
| cid: connection.id, |
| action: 'available' |
| }, 'Connection #%s became available', connection.id); |
| |
| if (this._closed) { |
| // if already closed run close() that will remove this connections from connections list |
| this.close(); |
| } else { |
| // check if there's anything else to send |
| this._processMessages(); |
| } |
| }); |
| |
| // resource is terminated with an error |
| connection.once('error', err => { |
| if (err.code !== 'EMAXLIMIT') { |
| this.logger.error({ |
| err, |
| tnx: 'pool', |
| cid: connection.id |
| }, 'Pool Error for #%s: %s', connection.id, err.message); |
| } else { |
| this.logger.debug({ |
| tnx: 'pool', |
| cid: connection.id, |
| action: 'maxlimit' |
| }, 'Max messages limit exchausted for #%s', connection.id); |
| } |
| |
| if (connection.queueEntry) { |
| try { |
| connection.queueEntry.callback(err); |
| } catch (E) { |
| this.logger.error({ |
| err: E, |
| tnx: 'callback', |
| cid: connection.id |
| }, 'Callback error for #%s: %s', connection.id, E.message); |
| } |
| connection.queueEntry = false; |
| } |
| |
| // remove the erroneus connection from connections list |
| this._removeConnection(connection); |
| |
| this._continueProcessing(); |
| }); |
| |
| connection.once('close', () => { |
| this.logger.info({ |
| tnx: 'connection', |
| cid: connection.id, |
| action: 'closed' |
| }, 'Connection #%s was closed', connection.id); |
| |
| this._removeConnection(connection); |
| |
| if (connection.queueEntry) { |
| // If the connection closed when sending, add the message to the queue again |
| // Note that we must wait a bit.. because the callback of the 'error' handler might be called |
| // in the next event loop |
| setTimeout(() => { |
| if (connection.queueEntry) { |
| this.logger.debug({ |
| tnx: 'pool', |
| cid: connection.id, |
| messageId: connection.queueEntry.messageId, |
| action: 'requeue' |
| }, 'Re-queued message <%s> for #%s', connection.queueEntry.messageId, connection.id); |
| this._queue.unshift(connection.queueEntry); |
| connection.queueEntry = false; |
| } |
| this._continueProcessing(); |
| }, 50); |
| } else { |
| this._continueProcessing(); |
| } |
| }); |
| |
| this._connections.push(connection); |
| |
| return connection; |
| } |
| |
| /** |
| * Continue to process message if the pool hasn't closed |
| */ |
| _continueProcessing() { |
| if (this._closed) { |
| this.close(); |
| } else { |
| setTimeout(() => this._processMessages(), 100); |
| } |
| } |
| |
| /** |
| * Remove resource from pool |
| * |
| * @param {Object} connection The PoolResource to remove |
| */ |
| _removeConnection(connection) { |
| let index = this._connections.indexOf(connection); |
| |
| if (index !== -1) { |
| this._connections.splice(index, 1); |
| } |
| } |
| |
| /** |
| * Checks if connections have hit current rate limit and if so, queues the availability callback |
| * |
| * @param {Function} callback Callback function to run once rate limiter has been cleared |
| */ |
| _checkRateLimit(callback) { |
| if (!this._rateLimit.limit) { |
| return callback(); |
| } |
| |
| let now = Date.now(); |
| |
| if (this._rateLimit.counter < this._rateLimit.limit) { |
| return callback(); |
| } |
| |
| this._rateLimit.waiting.push(callback); |
| |
| if (this._rateLimit.checkpoint <= now - this._rateLimit.delta) { |
| return this._clearRateLimit(); |
| } else if (!this._rateLimit.timeout) { |
| this._rateLimit.timeout = setTimeout(() => this._clearRateLimit(), this._rateLimit.delta - (now - this._rateLimit.checkpoint)); |
| this._rateLimit.checkpoint = now; |
| } |
| } |
| |
| /** |
| * Clears current rate limit limitation and runs paused callback |
| */ |
| _clearRateLimit() { |
| clearTimeout(this._rateLimit.timeout); |
| this._rateLimit.timeout = null; |
| this._rateLimit.counter = 0; |
| this._rateLimit.checkpoint = false; |
| |
| // resume all paused connections |
| while (this._rateLimit.waiting.length) { |
| let cb = this._rateLimit.waiting.shift(); |
| setImmediate(cb); |
| } |
| } |
| |
| /** |
| * Returns true if there are free slots in the queue |
| */ |
| isIdle() { |
| return this.idling; |
| } |
| |
| /** |
| * Verifies SMTP configuration |
| * |
| * @param {Function} callback Callback function |
| */ |
| verify(callback) { |
| let promise; |
| |
| if (!callback && typeof Promise === 'function') { |
| promise = new Promise((resolve, reject) => { |
| callback = shared.callbackPromise(resolve, reject); |
| }); |
| } |
| |
| let auth = new PoolResource(this).auth; |
| |
| this.getSocket(this.options, (err, socketOptions) => { |
| if (err) { |
| return callback(err); |
| } |
| |
| let options = this.options; |
| if (socketOptions && socketOptions.connection) { |
| this.logger.info({ |
| tnx: 'proxy', |
| remoteAddress: socketOptions.connection.remoteAddress, |
| remotePort: socketOptions.connection.remotePort, |
| destHost: options.host || '', |
| destPort: options.port || '', |
| action: 'connected' |
| }, 'Using proxied socket from %s:%s to %s:%s', socketOptions.connection.remoteAddress, socketOptions.connection.remotePort, options.host || '', options.port || ''); |
| options = shared.assign(false, options); |
| Object.keys(socketOptions).forEach(key => { |
| options[key] = socketOptions[key]; |
| }); |
| } |
| |
| let connection = new SMTPConnection(options); |
| let returned = false; |
| |
| connection.once('error', err => { |
| if (returned) { |
| return; |
| } |
| returned = true; |
| connection.close(); |
| return callback(err); |
| }); |
| |
| connection.once('end', () => { |
| if (returned) { |
| return; |
| } |
| returned = true; |
| return callback(new Error('Connection closed')); |
| }); |
| |
| let finalize = () => { |
| if (returned) { |
| return; |
| } |
| returned = true; |
| connection.quit(); |
| return callback(null, true); |
| }; |
| |
| connection.connect(() => { |
| if (returned) { |
| return; |
| } |
| |
| if (auth) { |
| connection.login(auth, err => { |
| if (returned) { |
| return; |
| } |
| |
| if (err) { |
| returned = true; |
| connection.close(); |
| return callback(err); |
| } |
| |
| finalize(); |
| }); |
| } else { |
| finalize(); |
| } |
| }); |
| }); |
| |
| return promise; |
| } |
| } |
| |
| // expose to the world |
| module.exports = SMTPPool; |