blob: 4e68097d6bd987492e3722498a94999ed308e00b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/**
* This file is essentially a "module" that is common to msgr-send.js and msgr-send.html.
* It defines the Statistics and MessengerSend classes and if the environment is Node.js
* it will import qpid-proton-messenger and export MessengerSend for use in msgr-send.js.
* Because it's really a module/library trying to execute msgr-send-common.js won't
* itself do anything terribly exciting.
*/
var Statistics = function() {
this.startTime = 0;
this.latencySamples = 0;
this.latencyTotal = 0;
this.latencyMin = 0;
this.latencyMax = 0;
};
Statistics.prototype.start = function() {
this.startTime = +new Date();
};
Statistics.prototype.messageReceived = function(msg) {
var ts = +msg.getCreationTime(); // The + gets the value of the returned Data Object.
if (ts) {
var l = +new Date() - ts;
if (l) {
this.latencyTotal += l;
this.latencySamples += 1;
if (this.latencySamples === 1) {
this.latencyMin = this.latencyMax = l;
} else {
if (this.latencyMin > l) {
this.latencyMin = l;
}
if (this.latencyMax < l) {
this.latencyMax = l;
}
}
}
}
};
Statistics.prototype.report = function(sent, received) {
var seconds = (+new Date() - this.startTime)/1000;
console.log("Messages sent: " + sent + " received: " + received);
console.log("Total time: " + seconds + " seconds");
if (seconds) {
console.log("Throughput: " + (sent/seconds) + " msgs/sec sent");
console.log("Throughput: " + (received/seconds) + " msgs/sec received");
}
if (this.latencySamples) {
console.log("Latency (ms): " + this.latencyMin + " min " +
this.latencyMax + " max " +
(this.latencyTotal/this.latencySamples) + " avg");
}
};
var MessengerSend = function(opts, callback) {
//if (opts.verbose) {
console.log("addresses = " + opts.addresses);
console.log("messageCount = " + opts.messageCount);
console.log("messageSize = " + opts.messageSize);
console.log("recvCount = " + opts.recvCount);
console.log("sendBatch = " + opts.sendBatch);
console.log("outgoingWindow = " + opts.outgoingWindow);
console.log("reportInterval = " + opts.reportInterval);
console.log("getReplies = " + opts.getReplies);
console.log("name = " + opts.name);
console.log("verbose = " + opts.verbose);
console.log();
//}
var stats = new Statistics();
var targets = [];
var running = true; // Used to avoid calling stop multiple times.
var sent = 0;
var received = 0;
var message = new proton.Message();
var replyMessage = new proton.Message();
var messenger = new proton.Messenger(opts.name);
// Retrieve replies and return the number of reply messages received.
var processReplies = function() {
var received = 0;
if (opts.verbose) {
console.log("Calling messenger.recv(" + opts.recvCount + ")");
}
messenger.recv(opts.recvCount);
if (opts.verbose) {
console.log("Messages on incoming queue: " + messenger.incoming());
}
while (messenger.incoming()) {
messenger.get(replyMessage);
received += 1;
//console.log("Address: " + replyMessage.getAddress());
//console.log("Content: " + replyMessage.body);
stats.messageReceived(replyMessage);
}
return received;
};
// Send messages as fast as possible. This is analogous to the while loop in
// the Python msgr-send.py but we wrap in a function in JavaScript so that
// we can schedule on the JavaScript Event queue via setTimeout. This is needed
// otherwise the JavaScript Event loop is blocked and no data gets sent.
var sendData = function() {
var delay = 0;
while (opts.messageCount === 0 || (sent < opts.messageCount)) {
// Check the amount of data buffered on the socket, if it's non-zero
// exit the loop and call senData again after a short delay. This
// will throttle the send rate if necessary.
if (messenger.getBufferedAmount()) {
console.log("messenger.getBufferedAmount() = " + messenger.getBufferedAmount());
delay = 100;
break; // Exit loop to check for exit condition and schedule to Event queue.
}
var index = sent % targets.length;
//console.log("sent = " + sent + ", index = " + index);
message.setAddress(targets[index]);
message.setCorrelationID(sent);
message.setCreationTime(new Date());
messenger.put(message);
sent += 1;
if (opts.sendBatch && (messenger.outgoing() >= opts.sendBatch)) {
if (opts.verbose) {
console.log("Calling messenger.send()")
}
messenger.send();
if (opts.getReplies) {
received += processReplies();
}
break; // Exit loop to check for exit condition and yield to Event loop.
}
}
// Check for exit condition.
if (running && !(opts.messageCount === 0 || (sent < opts.messageCount))) {
if (opts.getReplies && (received < sent)) {
received += processReplies();
if (opts.verbose) {
console.log("Messages sent = " + sent + ", received = " + received);
}
} else if (messenger.outgoing()) {
if (opts.verbose) {
console.log("Flushing pending sends");
}
messenger.send();
} else {
//console.log("******* calling stop")
messenger.stop();
running = false;
stats.report(sent, received);
if (callback) {
callback(stats);
}
}
}
if (messenger.isStopped()) {
//console.log("-------------------- messenger.isStopped()");
message.free();
messenger.free();
} else {
// schedule next call on the JavaScript Event queue. If we don't do this
// our messages won't get sent because none of the internal JavaScript
// network code will get any CPU.
// If available we call setImmediate rather than setTimeout when the delay
// is zero. setImmediate is more efficient, in particular I noticed that
// with Node.js v0.10.18 I could get max throughput and max out CPU using
// setTimeout, but when I upgraded to v0.10.33 my throughput dropped and
// my CPU was hovering around 55% but using setImmediate the performance
// improved again. My guess is that v0.10.18 was checking for zero delay
// and calling setImmediate internally whereas v0.10.33 wasn't, but I
// can't say for sure. TODO it's possible that some browsers might do a
// better job with setImmediate too (given what I'm seeing with Node.js),
// Chrome might be one such case, but it's not universally supported.
// It might be worth adding a proper polyfill to handle this.
if (delay === 0 && typeof setImmediate === 'function') {
setImmediate(sendData);
} else {
setTimeout(sendData, delay);
}
}
};
this.start = function() {
message.body = Array(+opts.messageSize + 1).join('X');
message.setReplyTo('~');
messenger.on('error', function(error) {
console.log(error);
opts.messageCount = -1; // Force exit condition.
});
if (opts.outgoingWindow) {
messenger.setOutgoingWindow(opts.outgoingWindow);
}
messenger.start();
// Unpack targets that were specified using comma-separated list
var addresses = opts.addresses.split(',');
for (var i = 0; i < addresses.length; i++) {
var address = addresses[i];
targets.push(address);
}
stats.start();
sendData();
};
};
// If running in Node.js import the proton library and export MessengerSend.
if (typeof module === 'object') {
var proton = require("qpid-proton-messenger");
module.exports.MessengerSend = MessengerSend;
}