blob: b755f59a1f761ab4bf4be51ea8326041702c667a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* This file defines the Module Object which provides a namespace around the Proton
* Messenger API. The Module object is used extensively by the emscripten runtime,
* however for convenience it is exported with the name "proton" and not "Module".
* <p>
* The emscripten compiled proton-c code and the JavaScript binding code will be
* minified by the Closure compiler, so all comments will be stripped from the
* actual library.
* <p>
* This JavaScript wrapper provides a somewhat more idiomatic object oriented
* interface which abstracts the low-level emscripten based implementation details
* from client code. Any similarities to the Proton Python binding are deliberate.
* @file
* The Module Object is exported by emscripten for all execution platforms, we
* use it as a namespace to allow us to selectively export only what we wish to
* be publicly visible from this package/module, which is wrapped in a closure.
* <p>
* Internally the binding code uses the associative array form for declaring
* exported properties to prevent the Closure compiler from minifying e.g.
* <pre>Module['Messenger'] = ...</pre>
* Exported Objects can however be used in client code using a more convenient
* and obvious proton namespace, e.g.:
* <pre>
* var proton = require('qpid-proton');
* var messenger = new proton.Messenger();
* var message = new proton.Message();
* ...
* </pre>
* The core part of this library is actually proton-c compiled into JavaScript.
* In order to provide C style memory management (malloc/free) emscripten uses
* a "virtual heap", which is actually a pre-allocated ArrayBuffer. The size of
* this virtual heap is set as part of the runtime initialisation and cannot be
* changed subsequently (the default size is 16*1024*1024 = 16777216).
* <p>
* Applications can specify the size of virtual heap that they require via the
* global variable PROTON_TOTAL_MEMORY, this must be set <b>before</b> the library is
* loaded e.g. in Node.js an application would do:
* <pre>
* PROTON_TOTAL_MEMORY = 50000000; // Note no var - it needs to be global.
* var proton = require('qpid-proton');
* ...
* </pre>
* A browser based application would do:
* <pre>
* &lt;script type="text/javascript"&gt;PROTON_TOTAL_MEMORY = 50000000;&lt;/script&gt;
* &lt;script type="text/javascript" src="proton.js">&lt;/script&gt;
* </pre>
* If the global variable PROTON_TOTAL_MEMORY has been set by the application this
* will result in the emscripten heap getting set to the next multiple of
* 16777216 above PROTON_TOTAL_MEMORY.
* <p>
* The global variable PROTON_TOTAL_STACK may be used in a similar way to increase
* the stack size from its default of 5*1024*1024 = 5242880. It is worth noting
* that Strings are allocated on the stack, so you may need this if you end up
* wanting to send very large strings.
* @namespace proton
var Module = {};
if (typeof global === 'object') { // If Node.js
if (global['PROTON_TOTAL_MEMORY']) {
if (global['PROTON_TOTAL_STACK']) {
} else if (typeof window === 'object') { // If browser
if (window['PROTON_TOTAL_MEMORY']) {
if (window['PROTON_TOTAL_STACK']) {
/* */
/* EventDispatch */
/* */
* EventDispatch is a Singleton class that allows callbacks to be registered,
* which will get triggered by the emscripten WebSocket network callbacks.
* Clients of Messenger will register callbacks by calling:
* <pre>
* messenger.on('error', &lt;callback function&gt;);
* messenger.on('work', &lt;callback function&gt;);
* messenger.on('subscription', &lt;callback function&gt;);
* </pre>
* EventDispatch supports callback registration from multiple Messenger instances.
* The client callbacks will actually be called when a given messenger has work
* available or a WebSocket close has been occurred.
* <p>
* The approach implemented here allows the registered callbacks to follow a
* similar pattern to _process_incoming and _process_outgoing in
* @constructor proton.EventDispatch
Module.EventDispatch = new function() { // Note the use of new to create a Singleton.
var POLLIN = 0x001;
var POLLOUT = 0x004;
var _error = null;
var _messengers = {}; // Keyed by name.
var _selectables = {}; // Keyed by file descriptor.
var _initialise = function() {
* Initialises the emscripten network callback functions. This needs
* to be done the first time we call registerMessenger rather than
* when we create the Singleton because emscripten's socket filesystem
* has to be mounted before can listen for any of these events.
Module['websocket']['on']('open', _pump);
Module['websocket']['on']('connection', _pump);
Module['websocket']['on']('message', _pump);
Module['websocket']['on']('close', _closeHandler);
Module['websocket']['on']('error', _errorHandler);
* For Node.js the network code uses the ws WebSocket library, see
* The following is a "Monkey Patch"
* that fixes a problem with Receiver.js where it wasn't checking if
* an Object was null before accessing its properties, so it was
* possible to see errors like:
* TypeError: Cannot read property 'fragmentedOperation' of null
* at Receiver.endPacket (.....node_modules/ws/lib/Receiver.js:224:18)
* This problem is generally seen in Server code after messenger.stop()
* I *think* that the underlying issue is actually because ws calls
* cleanup directly rather than pushing it onto the event loop so the
* this.state stuff gets cleared before the endPacket method is called.
* This fix simply interposes a check to avoid calling endPacket if
* the state has been cleared (i.e. the WebSocket has been closed).
try {
var ws = require('ws');
// Array notation to stop Closure compiler minifying properties we need.
ws['Receiver'].prototype['originalEndPacket'] = ws['Receiver'].prototype['endPacket'];
ws['Receiver'].prototype['endPacket'] = function() {
if (this['state']) {
} catch (e) {
console.error("Failed to apply Monkey Patch to ws WebSocket library");
_initialise = function() {}; // After first call replace with null function.
* Messenger error handling can be a bit inconsistent and in several places
* rather than returning an error code or setting an error it simply writes
* to fprintf. This is something of a Monkey Patch that replaces the emscripten
* library fprintf call with one that checks the message and sets a variable
* if the message is an ERROR. TODO At some point hopefully Dominic Evans'
* patch on Jira PROTON-571 will render this code redundant.
_fprintf = function(stream, format, varargs) {
var array = __formatString(format, varargs);
array.pop(); // Remove the trailing \n
var string = intArrayToString(array); // Convert to native JavaScript string.
if (string.indexOf('ERROR') === -1) { // If not an ERROR just log the message.
} else {
_error = string;
* This method iterates through all registered Messengers and retrieves any
* pending selectables, which are stored in a _selectables map keyed by fd.
var _updateSelectables = function() {
var sel = 0;
var fd = -1;
for (var name in _messengers) {
var messenger = _messengers[name];
while ((sel = _pn_messenger_selectable(messenger._messenger))) {
fd = _pn_selectable_fd(sel);
// Only register valid selectables, otherwise free them.
if (fd === -1) {
} else {
_selectables[fd] = {messenger: messenger, selectable: sel};
return fd; // Return the most recently added selector's file descriptor.
* Continually pump data while there's still work to do.
var _pump = function() {
while (_pumpOnce());
* This method more or less follows the pattern of the pump_once method from
* class Pump in tests/python/proton_tests/ It looks a little
* different because the select/poll implemented here uses some low-level
* emscripten internals (stream = FS.getStream(fd), sock = stream.node.sock,
* mask = sock.sock_ops.poll(sock)). We use the internals so we don't have
* to massage from file descriptors into the C style poll interface.
var _pumpOnce = function() {
var count = 0;
for (var fd in _selectables) {
var selectable = _selectables[fd];
var sel = selectable.selectable;
var terminal = _pn_selectable_is_terminal(sel);
if (terminal) {
delete _selectables[fd];
} else {
var stream = FS.getStream(fd);
if (stream) {
var sock = stream.node.sock;
if (sock.sock_ops.poll) {
var mask = sock.sock_ops.poll(sock); // Low-level poll call.
if (mask) {
var messenger = selectable.messenger;
var capacity = _pn_selectable_capacity(sel) > 0;
var pending = _pn_selectable_pending(sel) > 0;
if ((mask & POLLIN) && capacity) {
//console.log("- readable fd = " + fd + ", capacity = " + _pn_selectable_capacity(sel));
_error = null; // May get set by _pn_selectable_readable.
count++; // Should this be inside the test for _error? Don't know.
var errno = messenger['getErrno']();
_error = errno ? messenger['getError']() : _error;
if (_error) {
_errorHandler([fd, 0, _error]);
} else {
// Don't send work event if it's a listen socket.
if (!sock.server) {
if ((mask & POLLOUT) && pending) {
//console.log("- writeable fd = " + fd + ", pending = " + _pn_selectable_pending(sel));
//TODO looks like this block isn't needed. Need to
//check with a test-case that writes data as fast as
//it can. If not needed then delete.
// Check _selectables again in case the call to
// _pn_selectable_writable caused a socket close.
if (_selectables[fd]) {
return count;
* Handler for the emscripten socket close event.
var _closeHandler = function(fd) {
var selectable = _selectables[fd];
if (selectable) {
// Close and remove the selectable.
var sel = selectable.selectable;
_pn_selectable_free(sel); // This closes the underlying socket too.
delete _selectables[fd];
var messenger = selectable.messenger;
* Handler for the emscripten socket error event.
var _errorHandler = function(error) {
var fd = error[0];
var message = error[2];
var selectable = _selectables[fd];
if (selectable) {
// Close and remove the selectable.
var sel = selectable.selectable;
_pn_selectable_free(sel); // This closes the underlying socket too.
delete _selectables[fd];
var messenger = selectable.messenger;
// Remove any pending Subscriptions whose fd matches the error fd.
var subscriptions = messenger._pendingSubscriptions;
for (var i = 0; i < subscriptions.length; i++) {
subscription = subscriptions[i];
// Use == not === as we don't care if fd is a number or a string.
if (subscription.fd == fd) {
messenger._pendingSubscriptions.splice(i, 1);
if (message.indexOf('EHOSTUNREACH:') === 0) {
message = 'CONNECTION ERROR (' + subscription.source + '): bind: Address already in use';
messenger._emit('error', new Module['SubscriptionError'](subscription.source, message));
messenger._emit('error', new Module['MessengerError'](message));
* Flush any data that has been written by the Messenger put() method.
* @method flush
this.flush = function() {
* Subscribe to a specified source address.
* <p>
* This method is delegated to by the subscribe method of {@link proton.Messenger}.
* We delegate to EventDispatch because we create Subscription objects that
* contain some additional information (such as file descriptors) which are
* only available to EventDispatch and we don't really want to expose to the
* wider API. This low-level information is mainly used for error handling
* which is itself encapsulated in EventDispatch.
* @method subscribe
* @memberof! proton.EventDispatch#
* @param {proton.Messenger} messenger the Messenger instance that this
* subscription relates to.
* @param {string} source the address that we'd like to subscribe to.
this.subscribe = function(messenger, source) {
// First update selectables before subscribing so we can work out the
// Subscription fd (which will be the listen file descriptor).
var sp = Runtime.stackSave();
var subscription = _pn_messenger_subscribe(messenger._messenger,
allocate(intArrayFromString(source), 'i8', ALLOC_STACK));
var fd = _updateSelectables();
subscription = new Subscription(subscription, source, fd);
// For passive subscriptions emit a subscription event (almost) immediately,
// otherwise defer until the address has been resolved remotely.
if (subscription.passive) {
// We briefly delay the call to checkSubscriptions because it is possible
// for passive subscriptions to fail if another process is bound to the
// port specified in the subscription.
var check = function() {messenger._checkSubscriptions();};
setTimeout(check, 10);
return subscription;
* Register the specified Messenger as being interested in network events.
* @method registerMessenger
* @memberof! proton.EventDispatch#
* @param {proton.Messenger} messenger the Messenger instance we want to
* register to receive network events.
this.registerMessenger = function(messenger) {
var name = messenger['getName']();
_messengers[name] = messenger;
// Set the Messenger "passive" as we are supplying our own event loop here.
_pn_messenger_set_passive(messenger._messenger, true);
* Unregister the specified Messenger from interest in network events.
* @method unregisterMessenger
* @memberof! proton.EventDispatch#
* @param {proton.Messenger} messenger the Messenger instance we want to
* unregister from receiving network events.
this.unregisterMessenger = function(messenger) {
var name = messenger['getName']();
delete _messengers[name];