| // |
| // |
| // |
| |
| // General-purpose API for glueing everything together. |
| |
| 'use strict'; |
| |
| var URL = require('url'); |
| var QS = require('querystring'); |
| var Connection = require('./connection').Connection; |
| var fmt = require('util').format; |
| var credentials = require('./credentials'); |
| |
| function copyInto(obj, target) { |
| var keys = Object.keys(obj); |
| var i = keys.length; |
| while (i--) { |
| var k = keys[i]; |
| target[k] = obj[k]; |
| } |
| return target; |
| } |
| |
| // Adapted from util._extend, which is too fringe to use. |
| function clone(obj) { |
| return copyInto(obj, {}); |
| } |
| |
| var CLIENT_PROPERTIES = { |
| "product": "amqplib", |
| "version": require('../package.json').version, |
| "platform": fmt('Node.JS %s', process.version), |
| "information": "http://squaremo.github.io/amqp.node", |
| "capabilities": { |
| "publisher_confirms": true, |
| "exchange_exchange_bindings": true, |
| "basic.nack": true, |
| "consumer_cancel_notify": true, |
| "connection.blocked": true, |
| "authentication_failure_close": true |
| } |
| }; |
| |
| // Construct the main frames used in the opening handshake |
| function openFrames(vhost, query, credentials, extraClientProperties) { |
| if (!vhost) |
| vhost = '/'; |
| else |
| vhost = QS.unescape(vhost); |
| |
| var query = query || {}; |
| |
| function intOrDefault(val, def) { |
| return (val === undefined) ? def : parseInt(val); |
| } |
| |
| var clientProperties = Object.create(CLIENT_PROPERTIES); |
| |
| return { |
| // start-ok |
| 'clientProperties': copyInto(extraClientProperties, clientProperties), |
| 'mechanism': credentials.mechanism, |
| 'response': credentials.response(), |
| 'locale': query.locale || 'en_US', |
| |
| // tune-ok |
| 'channelMax': intOrDefault(query.channelMax, 0), |
| 'frameMax': intOrDefault(query.frameMax, 0x1000), |
| 'heartbeat': intOrDefault(query.heartbeat, 0), |
| |
| // open |
| 'virtualHost': vhost, |
| 'capabilities': '', |
| 'insist': 0 |
| }; |
| } |
| |
| // Decide on credentials based on what we're supplied. Note that in a |
| // parsed URL, the auth part is already URL-decoded, so e.g., '%3a' in |
| // the URL is already decoded to ':'. This is a bit unhelpful, as it |
| // means we can't tell whether a colon is a separator, or part of the |
| // username. Assume no colons in usernames. |
| function credentialsFromUrl(parts) { |
| var user = 'guest', passwd = 'guest'; |
| if (parts.auth) { |
| var colon = parts.auth.indexOf(':') |
| if (colon == -1) { |
| user = parts.auth; |
| passwd = ''; |
| } else { |
| user = parts.auth.substring(0, colon); |
| passwd = parts.auth.substring(colon+1); |
| } |
| } |
| return credentials.plain(user, passwd); |
| } |
| |
| function connect(url, socketOptions, openCallback) { |
| // tls.connect uses `util._extend()` on the options given it, which |
| // copies only properties mentioned in `Object.keys()`, when |
| // processing the options. So I have to make copies too, rather |
| // than using `Object.create()`. |
| var sockopts = clone(socketOptions || {}); |
| url = url || 'amqp://localhost'; |
| |
| var noDelay = !!sockopts.noDelay; |
| var timeout = sockopts.timeout; |
| var keepAlive = !!sockopts.keepAlive; |
| // 0 is default for node |
| var keepAliveDelay = sockopts.keepAliveDelay || 0; |
| |
| var extraClientProperties = sockopts.clientProperties || {}; |
| |
| var protocol, fields; |
| if (typeof url === 'object') { |
| protocol = (url.protocol || 'amqp') + ':'; |
| sockopts.host = url.hostname; |
| sockopts.port = url.port || ((protocol === 'amqp:') ? 5672 : 5671); |
| |
| var user, pass; |
| // Only default if both are missing, to have the same behaviour as |
| // the stringly URL. |
| if (url.username == undefined && url.password == undefined) { |
| user = 'guest'; pass = 'guest'; |
| } else { |
| user = url.username || ''; |
| pass = url.password || ''; |
| } |
| |
| var config = { |
| locale: url.locale, |
| channelMax: url.channelMax, |
| frameMax: url.frameMax, |
| heartbeat: url.heartbeat, |
| }; |
| |
| fields = openFrames(url.vhost, config, sockopts.credentials || credentials.plain(user, pass), extraClientProperties); |
| } else { |
| var parts = URL.parse(url, true); // yes, parse the query string |
| protocol = parts.protocol; |
| sockopts.host = parts.hostname; |
| sockopts.port = parseInt(parts.port) || ((protocol === 'amqp:') ? 5672 : 5671); |
| var vhost = parts.pathname ? parts.pathname.substr(1) : null; |
| fields = openFrames(vhost, parts.query, sockopts.credentials || credentialsFromUrl(parts), extraClientProperties); |
| } |
| |
| var sockok = false; |
| var sock; |
| |
| function onConnect() { |
| sockok = true; |
| sock.setNoDelay(noDelay); |
| if (keepAlive) sock.setKeepAlive(keepAlive, keepAliveDelay); |
| |
| var c = new Connection(sock); |
| c.open(fields, function(err, ok) { |
| // disable timeout once the connection is open, we don't want |
| // it fouling things |
| if (timeout) sock.setTimeout(0); |
| if (err === null) { |
| openCallback(null, c); |
| } |
| else openCallback(err); |
| }); |
| } |
| |
| if (protocol === 'amqp:') { |
| sock = require('net').connect(sockopts, onConnect); |
| } |
| else if (protocol === 'amqps:') { |
| sock = require('tls').connect(sockopts, onConnect); |
| } |
| else { |
| throw new Error("Expected amqp: or amqps: as the protocol; got " + protocol); |
| } |
| |
| if (timeout) { |
| sock.setTimeout(timeout, function() { |
| sock.end(); |
| sock.destroy(); |
| openCallback(new Error('connect ETIMEDOUT')); |
| }); |
| } |
| |
| sock.once('error', function(err) { |
| if (!sockok) openCallback(err); |
| }); |
| |
| } |
| |
| module.exports.connect = connect; |
| module.exports.credentialsFromUrl = credentialsFromUrl; |