blob: daf2a0f8e15a42fecb9513e3b54bee0d9fdd52b5 [file] [log] [blame]
/* jshint node: true, mocha: true */
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
'use strict';
var protocols = require('../lib/protocols'),
utils = require('../lib/utils'),
assert = require('assert'),
stream = require('stream'),
util = require('util');
var HANDSHAKE_REQUEST_TYPE = protocols.HANDSHAKE_REQUEST_TYPE;
var HANDSHAKE_RESPONSE_TYPE = protocols.HANDSHAKE_RESPONSE_TYPE;
var createProtocol = protocols.createProtocol;
suite('protocols', function () {
suite('Protocol', function () {
test('get name and types', function () {
var p = createProtocol({
namespace: 'foo',
protocol: 'HelloWorld',
types: [
{
name: 'Greeting',
type: 'record',
fields: [{name: 'message', type: 'string'}]
},
{
name: 'Curse',
type: 'error',
fields: [{name: 'message', type: 'string'}]
}
],
messages: {
hello: {
request: [{name: 'greeting', type: 'Greeting'}],
response: 'Greeting',
errors: ['Curse']
},
hi: {
request: [{name: 'hey', type: 'string'}],
response: 'null',
'one-way': true
}
}
});
assert.equal(p.getName(), 'foo.HelloWorld');
assert.equal(p.getType('foo.Greeting').getName(true), 'record');
});
test('missing message', function () {
var ptcl = createProtocol({namespace: 'com.acme', protocol: 'Hello'});
assert.throws(function () {
ptcl.on('add', function () {});
}, /unknown/);
});
test('missing name', function () {
assert.throws(function () {
createProtocol({namespace: 'com.acme', messages: {}});
});
});
test('missing type', function () {
assert.throws(function () {
createProtocol({
namespace: 'com.acme',
protocol: 'HelloWorld',
messages: {
hello: {
request: [{name: 'greeting', type: 'Greeting'}],
response: 'Greeting'
}
}
});
});
});
test('get messages', function () {
var ptcl;
ptcl = createProtocol({protocol: 'Empty'});
assert.deepEqual(ptcl.getMessages(), {});
ptcl = createProtocol({
protocol: 'Ping',
messages: {
ping: {
request: [],
response: 'string'
}
}
});
var messages = ptcl.getMessages();
assert.equal(Object.keys(messages).length, 1);
assert(messages.ping !== undefined);
});
test('create listener', function (done) {
var ptcl = createProtocol({protocol: 'Empty'});
var transport = new stream.PassThrough();
var ee = ptcl.createListener(transport, function (pending) {
assert.equal(pending, 0);
done();
});
ee.destroy();
});
test('subprotocol', function () {
var ptcl = createProtocol({namespace: 'com.acme', protocol: 'Hello'});
var subptcl = ptcl.subprotocol();
assert.strictEqual(subptcl._emitterResolvers, ptcl._emitterResolvers);
assert.strictEqual(subptcl._listenerResolvers, ptcl._listenerResolvers);
});
test('invalid emitter', function (done) {
var ptcl = createProtocol({protocol: 'Empty'});
ptcl.emit('hi', {}, null, function (err) {
assert(/invalid emitter/.test(err.string));
done();
});
});
test('inspect', function () {
var p = createProtocol({
namespace: 'hello',
protocol: 'World',
});
assert.equal(p.inspect(), '<Protocol "hello.World">');
});
});
suite('Message', function () {
var Message = protocols.Message;
test('empty errors', function () {
var m = new Message('Hi', {
request: [{name: 'greeting', type: 'string'}],
response: 'int'
});
assert.deepEqual(m.errorType.toString(), '["string"]');
});
test('missing response', function () {
assert.throws(function () {
new Message('Hi', {
request: [{name: 'greeting', type: 'string'}]
});
});
});
test('invalid one-way', function () {
// Non-null response.
assert.throws(function () {
new Message('Hi', {
request: [{name: 'greeting', type: 'string'}],
response: 'string',
'one-way': true
});
});
// Non-empty errors.
assert.throws(function () {
new Message('Hi', {
request: [{name: 'greeting', type: 'string'}],
response: 'null',
errors: ['int'],
'one-way': true
});
});
});
});
suite('MessageDecoder', function () {
var MessageDecoder = protocols.streams.MessageDecoder;
test('ok', function (done) {
var parts = [
new Buffer([0, 1]),
new Buffer([2]),
new Buffer([]),
new Buffer([3, 4, 5]),
new Buffer([])
];
var messages = [];
var readable = createReadableStream(parts.map(frame), true);
var writable = createWritableStream(messages, true)
.on('finish', function () {
assert.deepEqual(
messages,
[new Buffer([0, 1, 2]), new Buffer([3, 4, 5])]
);
done();
});
readable.pipe(new MessageDecoder()).pipe(writable);
});
test('trailing data', function (done) {
var parts = [
new Buffer([0, 1]),
new Buffer([2]),
new Buffer([]),
new Buffer([3])
];
var messages = [];
var readable = createReadableStream(parts.map(frame), true);
var writable = createWritableStream(messages, true);
readable
.pipe(new MessageDecoder())
.on('error', function () {
assert.deepEqual(messages, [new Buffer([0, 1, 2])]);
done();
})
.pipe(writable);
});
test('empty', function (done) {
var readable = createReadableStream([], true);
readable
.pipe(new MessageDecoder(true))
.on('error', function () { done(); });
});
});
suite('MessageEncoder', function () {
var MessageEncoder = protocols.streams.MessageEncoder;
test('invalid frame size', function () {
assert.throws(function () { new MessageEncoder(); });
});
test('ok', function (done) {
var messages = [
new Buffer([0, 1]),
new Buffer([2])
];
var frames = [];
var readable = createReadableStream(messages, true);
var writable = createWritableStream(frames, true);
readable
.pipe(new MessageEncoder(64))
.pipe(writable)
.on('finish', function () {
assert.deepEqual(
frames,
[
new Buffer([0, 0, 0, 2, 0, 1, 0, 0, 0, 0]),
new Buffer([0, 0, 0, 1, 2, 0, 0, 0, 0])
]
);
done();
});
});
test('all zeros', function (done) {
var messages = [new Buffer([0, 0, 0, 0])];
var frames = [];
var readable = createReadableStream(messages, true);
var writable = createWritableStream(frames, true);
readable
.pipe(new MessageEncoder(64))
.pipe(writable)
.on('finish', function () {
assert.deepEqual(
frames,
[new Buffer([0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0])]
);
done();
});
});
test('short frame size', function (done) {
var messages = [
new Buffer([0, 1, 2]),
new Buffer([2])
];
var frames = [];
var readable = createReadableStream(messages, true);
var writable = createWritableStream(frames, true);
readable
.pipe(new MessageEncoder(2))
.pipe(writable)
.on('finish', function () {
assert.deepEqual(
frames,
[
new Buffer([0, 0, 0, 2, 0, 1, 0, 0, 0, 1, 2, 0, 0, 0, 0]),
new Buffer([0, 0, 0, 1, 2, 0, 0, 0, 0])
]
);
done();
});
});
});
suite('StatefulEmitter', function () {
test('ok handshake', function (done) {
var buf = HANDSHAKE_RESPONSE_TYPE.toBuffer({match: 'BOTH'});
var bufs = [];
var ptcl = createProtocol({protocol: 'Empty'});
var handshake = false;
ptcl.createEmitter(createTransport([buf], bufs))
.on('handshake', function (req, res) {
handshake = true;
assert(res.match === 'BOTH');
assert.deepEqual(
Buffer.concat(bufs),
HANDSHAKE_REQUEST_TYPE.toBuffer({
clientHash: new Buffer(ptcl._hashString, 'binary'),
serverHash: new Buffer(ptcl._hashString, 'binary')
})
);
this.destroy();
})
.on('eot', function () {
assert(handshake);
done();
});
});
test('no server match handshake', function (done) {
var ptcl = createProtocol({protocol: 'Empty'});
var resBufs = [
{
match: 'NONE',
serverHash: {'org.apache.avro.ipc.MD5': new Buffer(16)},
serverProtocol: {string: ptcl.toString()},
},
{match: 'BOTH'}
].map(function (val) { return HANDSHAKE_RESPONSE_TYPE.toBuffer(val); });
var reqBufs = [];
var handshakes = 0;
ptcl.createEmitter(createTransport(resBufs, reqBufs))
.on('handshake', function (req, res) {
if (handshakes++) {
assert(res.match === 'BOTH');
this.destroy();
} else {
assert(res.match === 'NONE');
}
})
.on('eot', function () {
assert.equal(handshakes, 2);
done();
});
});
test('incompatible protocol', function (done) {
var ptcl = createProtocol({protocol: 'Empty'});
var hash = new Buffer(16); // Pretend the hash was different.
var resBufs = [
{
match: 'NONE',
serverHash: {'org.apache.avro.ipc.MD5': hash},
serverProtocol: {string: ptcl.toString()},
},
{
match: 'NONE',
serverHash: {'org.apache.avro.ipc.MD5': hash},
serverProtocol: {string: ptcl.toString()},
meta: {map: {error: new Buffer('abcd')}}
}
].map(function (val) { return HANDSHAKE_RESPONSE_TYPE.toBuffer(val); });
var error = false;
ptcl.createEmitter(createTransport(resBufs, []))
.on('error', function (err) {
error = true;
assert.equal(err.message, 'abcd');
})
.on('eot', function () {
assert(error);
done();
});
});
test('handshake error', function (done) {
var resBufs = [
new Buffer([4, 0, 0]), // Invalid handshakes.
new Buffer([4, 0, 0])
];
var ptcl = createProtocol({protocol: 'Empty'});
var error = false;
ptcl.createEmitter(createTransport(resBufs, []))
.on('error', function (err) {
error = true;
assert.equal(err.message, 'handshake error');
})
.on('eot', function () {
assert(error);
done();
});
});
test('orphan response', function (done) {
var ptcl = createProtocol({protocol: 'Empty'});
var idType = protocols.IdType.createMetadataType();
var resBufs = [
new Buffer([0, 0, 0]), // OK handshake.
idType.toBuffer(23)
];
var error = false;
ptcl.createEmitter(createTransport(resBufs, []))
.on('error', function (err) {
error = true;
assert(/orphan response:/.test(err.message));
})
.on('eot', function () {
assert(error);
done();
});
});
test('ended readable', function (done) {
var bufs = [];
var ptcl = createProtocol({protocol: 'Empty'});
ptcl.createEmitter(createTransport([], bufs))
.on('eot', function () {
assert.equal(bufs.length, 1); // A single handshake was sent.
done();
});
});
test('interrupted', function (done) {
var ptcl = createProtocol({
protocol: 'Empty',
messages: {
id: {request: [{name: 'id', type: 'int'}], response: 'int'}
}
});
var resBufs = [
new Buffer([0, 0, 0]), // OK handshake.
];
var interrupted = 0;
var transport = createTransport(resBufs, []);
var ee = ptcl.createEmitter(transport, function () {
assert.equal(interrupted, 2);
done();
});
ptcl.emit('id', {id: 123}, ee, cb);
ptcl.emit('id', {id: 123}, ee, cb);
function cb(err) {
assert.deepEqual(err, {string: 'interrupted'});
interrupted++;
}
});
test('missing client message', function (done) {
var ptcl1 = createProtocol({
protocol: 'Ping',
messages: {
ping: {request: [], response: 'string'}
}
});
var ptcl2 = createProtocol({
protocol: 'Ping',
messages: {
ping: {request: [], response: 'string'},
pong: {request: [], response: 'string'}
}
}).on('ping', function (req, ee, cb) { cb(null, 'ok'); });
var transports = createPassthroughTransports();
ptcl2.createListener(transports[1]);
var ee = ptcl1.createEmitter(transports[0]);
ptcl1.emit('ping', {}, ee, function (err, res) {
assert.equal(res, 'ok');
done();
});
});
test('missing server message', function (done) {
var ptcl1 = createProtocol({
protocol: 'Ping',
messages: {
ping: {request: [], response: 'string'}
}
});
var ptcl2 = createProtocol({protocol: 'Empty'});
var transports = createPassthroughTransports();
ptcl2.createListener(transports[1]);
ptcl1.createEmitter(transports[0])
.on('error', function (err) {
assert(/missing server message: ping/.test(err.message));
done();
});
});
test('trailing data', function (done) {
var ptcl = createProtocol({
protocol: 'Ping',
messages: {
ping: {request: [], response: 'string'}
}
});
var transports = createPassthroughTransports();
ptcl.createEmitter(transports[0])
.on('error', function (err) {
assert(/trailing data/.test(err.message));
done();
});
transports[0].readable.end(new Buffer([2, 3]));
});
test('invalid metadata', function (done) {
var ptcl = createProtocol({
protocol: 'Ping',
messages: {
ping: {request: [], response: 'string'}
}
});
var transports = createPassthroughTransports();
ptcl.createListener(transports[1]);
ptcl.createEmitter(transports[0])
.on('error', function (err) {
assert(/invalid metadata:/.test(err.message));
done();
})
.on('handshake', function () {
transports[0].readable.write(frame(new Buffer([2, 3])));
transports[0].readable.write(frame(new Buffer(0)));
});
});
test('invalid response', function (done) {
var ptcl = createProtocol({
protocol: 'Ping',
messages: {
ping: {request: [], response: 'string'}
}
});
var transports = createPassthroughTransports();
var ml = ptcl.createListener(transports[1]);
var me = ptcl.createEmitter(transports[0])
.on('handshake', function () {
ml.destroy();
ptcl.emit('ping', {}, me, function (err) {
assert(/invalid response:/.test(err.string));
done();
});
var idType = protocols.IdType.createMetadataType();
var bufs = [
idType.toBuffer(1), // Metadata.
new Buffer([3]) // Invalid response.
];
transports[0].readable.write(frame(Buffer.concat(bufs)));
transports[0].readable.write(frame(new Buffer(0)));
});
});
test('one way', function (done) {
var beats = 0;
var ptcl = createProtocol({
protocol: 'Heartbeat',
messages: {
beat: {request: [], response: 'null', 'one-way': true}
}
}).on('beat', function (req, ee, cb) {
assert.strictEqual(cb, undefined);
if (++beats === 2) {
done();
}
});
var transports = createPassthroughTransports();
ptcl.createListener(transports[1]);
var ee = ptcl.createEmitter(transports[0]);
ptcl.emit('beat', {}, ee);
ptcl.emit('beat', {}, ee);
});
});
suite('StatelessEmitter', function () {
test('interrupted before response data', function (done) {
var ptcl = createProtocol({
protocol: 'Ping',
messages: {ping: {request: [], response: 'boolean'}}
});
var readable = stream.PassThrough()
.on('end', done);
var writable = createWritableStream([]);
var ee = ptcl.createEmitter(function (cb) {
cb(readable);
return writable;
});
ptcl.emit('ping', {}, ee, function (err) {
assert(/interrupted/.test(err.string));
readable.write(frame(new Buffer(2)));
readable.end(frame(new Buffer(0)));
});
ee.destroy(true);
});
});
suite('StatefulListener', function () {
test('end readable', function (done) {
var ptcl = createProtocol({protocol: 'Empty'});
var transports = createPassthroughTransports();
ptcl.createListener(transports[0])
.on('eot', function (pending) {
assert.equal(pending, 0);
done();
});
transports[0].readable.end();
});
test('finish writable', function (done) {
var ptcl = createProtocol({protocol: 'Empty'});
var transports = createPassthroughTransports();
ptcl.createListener(transports[0])
.on('eot', function (pending) {
assert.equal(pending, 0);
done();
});
transports[0].writable.end();
});
test('invalid handshake', function (done) {
var ptcl = createProtocol({protocol: 'Empty'});
var transport = createTransport(
[new Buffer([4])], // Invalid handshake.
[]
);
ptcl.createListener(transport)
.on('handshake', function (req, res) {
assert(!req.$isValid());
assert.equal(res.match, 'NONE');
done();
});
});
test('missing server message', function (done) {
var ptcl1 = createProtocol({protocol: 'Empty'});
var ptcl2 = createProtocol({
protocol: 'Heartbeat',
messages: {beat: {request: [], response: 'boolean'}}
});
var hash = new Buffer(ptcl2._hashString, 'binary');
var req = {
clientHash: hash,
clientProtocol: {string: ptcl2.toString()},
serverHash: hash
};
var transport = createTransport(
[HANDSHAKE_REQUEST_TYPE.toBuffer(req)],
[]
);
ptcl1.createListener(transport)
.on('handshake', function (req, res) {
assert(req.$isValid());
assert.equal(res.match, 'NONE');
var msg = res.meta.map.error.toString();
assert(/missing server message/.test(msg));
done();
});
});
test('invalid metadata', function (done) {
var ptcl = createProtocol({
protocol: 'Heartbeat',
messages: {beat: {request: [], response: 'boolean'}}
});
var transports = createPassthroughTransports();
ptcl.createListener(transports[1])
.on('error', function (err) {
assert(/invalid metadata/.test(err.message));
done();
});
ptcl.createEmitter(transports[0])
.on('handshake', function () {
// Handshake is complete now.
var writable = transports[0].writable;
writable.write(frame(new Buffer([0]))); // Empty metadata.
writable.write(frame(new Buffer(0)));
});
});
test('unknown message', function (done) {
var ptcl = createProtocol({
protocol: 'Heartbeat',
messages: {beat: {request: [], response: 'boolean'}}
});
var transports = createPassthroughTransports();
var ee = ptcl.createListener(transports[1])
.on('eot', function () {
transports[1].writable.end();
});
ptcl.createEmitter(transports[0])
.on('handshake', function () {
// Handshake is complete now.
this.destroy();
var idType = ee._idType;
var bufs = [];
transports[0].readable
.pipe(new protocols.streams.MessageDecoder())
.on('data', function (buf) { bufs.push(buf); })
.on('end', function () {
assert.equal(bufs.length, 1);
var tap = new utils.Tap(bufs[0]);
idType._read(tap);
assert(tap.buf[tap.pos++]); // Error byte.
tap.pos++; // Union marker.
assert(/unknown message/.test(tap.readString()));
done();
});
[
idType.toBuffer(-1),
new Buffer([4, 104, 105]), // `hi` message.
new Buffer(0) // End of frame.
].forEach(function (buf) {
transports[0].writable.write(frame(buf));
});
transports[0].writable.end();
});
});
test('invalid request', function (done) {
var ptcl = createProtocol({
protocol: 'Heartbeat',
messages: {beat: {
request: [{name: 'id', type: 'string'}],
response: 'boolean'
}}
});
var transports = createPassthroughTransports();
var ee = ptcl.createListener(transports[1])
.on('eot', function () { transports[1].writable.end(); });
ptcl.createEmitter(transports[0])
.on('handshake', function () {
// Handshake is complete now.
this.destroy();
var idType = ee._idType;
var bufs = [];
transports[0].readable
.pipe(new protocols.streams.MessageDecoder())
.on('data', function (buf) { bufs.push(buf); })
.on('end', function () {
assert.equal(bufs.length, 1);
var tap = new utils.Tap(bufs[0]);
idType._read(tap);
assert.equal(tap.buf[tap.pos++], 1); // Error byte.
assert.equal(tap.buf[tap.pos++], 0); // Union marker.
assert(/invalid request/.test(tap.readString()));
done();
});
[
idType.toBuffer(-1),
new Buffer([8, 98, 101, 97, 116]), // `beat` message.
new Buffer([8]), // Invalid Avro string encoding.
new Buffer(0) // End of frame.
].forEach(function (buf) {
transports[0].writable.write(frame(buf));
});
transports[0].writable.end();
});
});
test('destroy', function (done) {
var ptcl = createProtocol({
protocol: 'Heartbeat',
messages: {beat: {request: [], response: 'boolean'}}
}).on('beat', function (req, ee, cb) {
ee.destroy();
setTimeout(function () { cb(null, true); }, 10);
});
var transports = createPassthroughTransports();
var responded = false;
ptcl.createListener(transports[1])
.on('eot', function () {
assert(responded); // Works because the transport is sync.
done();
});
ptcl.emit('beat', {}, ptcl.createEmitter(transports[0]), function () {
responded = true;
});
});
});
suite('StatelessListener', function () {
test('unknown message', function (done) {
var ptcl = createProtocol({
protocol: 'Heartbeat',
messages: {beat: {request: [], response: 'boolean'}}
});
var readable = new stream.PassThrough();
var writable = new stream.PassThrough();
var ee = ptcl.createListener(function (cb) {
cb(writable);
return readable;
});
var bufs = [];
writable.pipe(new protocols.streams.MessageDecoder())
.on('data', function (buf) { bufs.push(buf); })
.on('end', function () {
assert.equal(bufs.length, 1);
var tap = new utils.Tap(bufs[0]);
tap.pos = 4; // Skip handshake response.
ee._idType._read(tap); // Skip metadata.
assert.equal(tap.buf[tap.pos++], 1); // Error.
assert.equal(tap.buf[tap.pos++], 0); // Union flag.
assert(/unknown message/.test(tap.readString()));
done();
});
var hash = new Buffer(ptcl._hashString, 'binary');
var req = {
clientHash: hash,
clientProtocol: null,
serverHash: hash
};
var encoder = new protocols.streams.MessageEncoder(64);
encoder.pipe(readable);
encoder.end(Buffer.concat([
HANDSHAKE_REQUEST_TYPE.toBuffer(req),
new Buffer([0]), // Empty metadata.
new Buffer([4, 104, 105]) // `id` message.
]));
});
test('late writable', function (done) {
var ptcl = createProtocol({
protocol: 'Heartbeat',
messages: {beat: {request: [], response: 'boolean'}}
}).on('beat', function (req, ee, cb) {
cb(null, true);
});
var readable = new stream.PassThrough();
var writable = new stream.PassThrough();
ptcl.createListener(function (cb) {
setTimeout(function () { cb(readable); }, 10);
return writable;
});
var ee = ptcl.createEmitter(function (cb) {
cb(readable);
return writable;
});
ptcl.emit('beat', {}, ee, function (err, res) {
assert.strictEqual(err, null);
assert.equal(res, true);
done();
});
});
});
suite('emit', function () {
suite('stateful', function () {
run(function (emitterPtcl, listenerPtcl, cb) {
var pt1 = new stream.PassThrough();
var pt2 = new stream.PassThrough();
var opts = {bufferSize: 48};
cb(
emitterPtcl.createEmitter({readable: pt1, writable: pt2}, opts),
listenerPtcl.createListener({readable: pt2, writable: pt1}, opts)
);
});
});
suite('stateless', function () {
run(function (emitterPtcl, listenerPtcl, cb) {
cb(emitterPtcl.createEmitter(writableFactory));
function writableFactory(emitterCb) {
var reqPt = new stream.PassThrough()
.on('finish', function () {
listenerPtcl.createListener(function (listenerCb) {
var resPt = new stream.PassThrough()
.on('finish', function () { emitterCb(resPt); });
listenerCb(resPt);
return reqPt;
});
});
return reqPt;
}
});
});
function run(setupFn) {
test('single', function (done) {
var ptcl = createProtocol({
protocol: 'Math',
messages: {
negate: {
request: [{name: 'n', type: 'int'}],
response: 'int'
}
}
});
setupFn(ptcl, ptcl, function (ee) {
ee.on('eot', function () { done(); });
ptcl.on('negate', function (req, ee, cb) { cb(null, -req.n); });
ptcl.emit('negate', {n: 20}, ee, function (err, res) {
assert.equal(this, ptcl);
assert.strictEqual(err, null);
assert.equal(res, -20);
this.emit('negate', {n: 'hi'}, ee, function (err) {
assert(/invalid "int"/.test(err.string));
ee.destroy();
});
});
});
});
test('invalid request', function (done) {
var ptcl = createProtocol({
protocol: 'Math',
messages: {
negate: {
request: [{name: 'n', type: 'int'}],
response: 'int'
}
}
}).on('negate', function () { assert(false); });
setupFn(ptcl, ptcl, function (ee) {
ee.on('eot', function () { done(); });
ptcl.emit('negate', {n: 'a'}, ee, function (err) {
assert(/invalid "int"/.test(err.string), null);
ee.destroy();
});
});
});
test('error response', function (done) {
var msg = 'must be non-negative';
var ptcl = createProtocol({
protocol: 'Math',
messages: {
sqrt: {
request: [{name: 'n', type: 'float'}],
response: 'float'
}
}
}).on('sqrt', function (req, ee, cb) {
var n = req.n;
if (n < 0) {
cb({string: msg});
} else {
cb(null, Math.sqrt(n));
}
});
setupFn(ptcl, ptcl, function (ee) {
ptcl.emit('sqrt', {n: 100}, ee, function (err, res) {
assert(Math.abs(res - 10) < 1e-5);
ptcl.emit('sqrt', {n: - 10}, ee, function (err) {
assert.equal(this, ptcl);
assert.equal(err.string, msg);
done();
});
});
});
});
test('invalid response', function (done) {
var ptcl = createProtocol({
protocol: 'Math',
messages: {
sqrt: {
request: [{name: 'n', type: 'float'}],
response: 'float'
}
}
}).on('sqrt', function (req, ee, cb) {
var n = req.n;
if (n < 0) {
cb(null, 'complex'); // Invalid response.
} else {
cb(null, Math.sqrt(n));
}
});
setupFn(ptcl, ptcl, function (ee) {
ptcl.emit('sqrt', {n: - 10}, ee, function (err) {
// The server error message is propagated to the client.
assert(/invalid "float"/.test(err.string));
ptcl.emit('sqrt', {n: 100}, ee, function (err, res) {
// And the server doesn't die (we can make a new request).
assert(Math.abs(res - 10) < 1e-5);
done();
});
});
});
});
test('invalid error', function (done) {
var ptcl = createProtocol({
protocol: 'Math',
messages: {
sqrt: {
request: [{name: 'n', type: 'float'}],
response: 'float'
}
}
}).on('sqrt', function (req, ee, cb) {
var n = req.n;
if (n < 0) {
cb({error: 'complex'}); // Invalid error.
} else {
cb(null, Math.sqrt(n));
}
});
setupFn(ptcl, ptcl, function (ee) {
ptcl.emit('sqrt', {n: - 10}, ee, function (err) {
assert(/invalid \["string"\]/.test(err.string));
ptcl.emit('sqrt', {n: 100}, ee, function (err, res) {
// The server still doesn't die (we can make a new request).
assert(Math.abs(res - 10) < 1e-5);
done();
});
});
});
});
test('out of order', function (done) {
var ptcl = createProtocol({
protocol: 'Delay',
messages: {
wait: {
request: [
{name: 'ms', type: 'float'},
{name: 'id', type: 'string'}
],
response: 'string'
}
}
}).on('wait', function (req, ee, cb) {
var delay = req.ms;
if (delay < 0) {
cb(new Error('delay must be non-negative'));
return;
}
setTimeout(function () { cb(null, req.id); }, delay);
});
var ids = [];
setupFn(ptcl, ptcl, function (ee) {
ee.on('eot', function (pending) {
assert.equal(pending, 0);
assert.deepEqual(ids, [null, 'b', 'a']);
done();
});
ptcl.emit('wait', {ms: 100, id: 'a'}, ee, function (err, res) {
assert.strictEqual(err, null);
ids.push(res);
});
ptcl.emit('wait', {ms: 10, id: 'b'}, ee, function (err, res) {
assert.strictEqual(err, null);
ids.push(res);
ee.destroy();
});
ptcl.emit('wait', {ms: -100, id: 'c'}, ee, function (err, res) {
assert(/non-negative/.test(err.string));
ids.push(res);
});
});
});
test('compatible protocols', function (done) {
var emitterPtcl = createProtocol({
protocol: 'emitterProtocol',
messages: {
age: {
request: [{name: 'name', type: 'string'}],
response: 'long'
}
}
});
var listenerPtcl = createProtocol({
protocol: 'serverProtocol',
messages: {
age: {
request: [
{name: 'name', type: 'string'},
{name: 'address', type: ['null', 'string'], 'default': null}
],
response: 'int'
},
id: {
request: [{name: 'name', type: 'string'}],
response: 'long'
}
}
});
setupFn(
emitterPtcl,
listenerPtcl,
function (ee) {
listenerPtcl.on('age', function (req, ee, cb) {
assert.equal(req.name, 'Ann');
cb(null, 23);
});
emitterPtcl.emit('age', {name: 'Ann'}, ee, function (err, res) {
assert.strictEqual(err, null);
assert.equal(res, 23);
done();
});
}
);
});
test('cached compatible protocols', function (done) {
var ptcl1 = createProtocol({
protocol: 'emitterProtocol',
messages: {
age: {
request: [{name: 'name', type: 'string'}],
response: 'long'
}
}
});
var ptcl2 = createProtocol({
protocol: 'serverProtocol',
messages: {
age: {
request: [
{name: 'name', type: 'string'},
{name: 'address', type: ['null', 'string'], 'default': null}
],
response: 'int'
},
id: {
request: [{name: 'name', type: 'string'}],
response: 'long'
}
}
}).on('age', function (req, ee, cb) { cb(null, 48); });
setupFn(
ptcl1,
ptcl2,
function (ee1) {
ptcl1.emit('age', {name: 'Ann'}, ee1, function (err, res) {
assert.equal(res, 48);
setupFn(
ptcl1,
ptcl2,
function (ee2) { // ee2 has the server's protocol.
ptcl1.emit('age', {name: 'Bob'}, ee2, function (err, res) {
assert.equal(res, 48);
done();
});
}
);
});
}
);
});
test('incompatible protocols', function (done) {
var emitterPtcl = createProtocol({
protocol: 'emitterProtocol',
messages: {
age: {request: [{name: 'name', type: 'string'}], response: 'long'}
}
});
var listenerPtcl = createProtocol({
protocol: 'serverProtocol',
messages: {
age: {request: [{name: 'name', type: 'int'}], response: 'long'}
}
}).on('age', function (req, ee, cb) { cb(null, 0); });
setupFn(
emitterPtcl,
listenerPtcl,
function (ee) {
ee.on('error', function () {}); // For stateful protocols.
emitterPtcl.emit('age', {name: 'Ann'}, ee, function (err) {
assert(err);
done();
});
}
);
});
test('unknown message', function (done) {
var ptcl = createProtocol({protocol: 'Empty'});
setupFn(ptcl, ptcl, function (ee) {
ptcl.emit('echo', {}, ee, function (err) {
assert(/unknown/.test(err.string));
done();
});
});
});
test('unsupported message', function (done) {
var ptcl = createProtocol({
protocol: 'Echo',
messages: {
echo: {
request: [{name: 'id', type: 'string'}],
response: 'string'
}
}
});
setupFn(ptcl, ptcl, function (ee) {
ptcl.emit('echo', {id: ''}, ee, function (err) {
assert(/unsupported/.test(err.string));
done();
});
});
});
test('destroy emitter noWait', function (done) {
var ptcl = createProtocol({
protocol: 'Delay',
messages: {
wait: {
request: [{name: 'ms', type: 'int'}],
response: 'string'
}
}
}).on('wait', function (req, ee, cb) {
setTimeout(function () { cb(null, 'ok'); }, req.ms);
});
var interrupted = 0;
var eoted = false;
setupFn(ptcl, ptcl, function (ee) {
ee.on('eot', function (pending) {
eoted = true;
assert.equal(interrupted, 2);
assert.equal(pending, 2);
done();
});
ptcl.emit('wait', {ms: 75}, ee, interruptedCb);
ptcl.emit('wait', {ms: 50}, ee, interruptedCb);
ptcl.emit('wait', {ms: 10}, ee, function (err, res) {
assert.equal(res, 'ok');
ee.destroy(true);
});
function interruptedCb(err) {
assert(/interrupted/.test(err.string));
interrupted++;
}
});
});
test('destroy emitter', function (done) {
var ptcl = createProtocol({
protocol: 'Math',
messages: {
negate: {
request: [{name: 'n', type: 'int'}],
response: 'int'
}
}
});
setupFn(ptcl, ptcl, function (ee) {
ptcl.on('negate', function (req, ee, cb) { cb(null, -req.n); });
ptcl.emit('negate', {n: 20}, ee, function (err, res) {
assert.strictEqual(err, null);
assert.equal(res, -20);
ee.destroy();
this.emit('negate', {n: 'hi'}, ee, function (err) {
assert(/destroyed/.test(err.string));
done();
});
});
});
});
}
});
test('throw error', function () {
assert(!tryCatch(null));
assert.equal(tryCatch(new Error('hi')), 'hi');
assert.equal(tryCatch('hi'), 'hi');
assert.equal(tryCatch({string: 'hi'}), 'hi');
function tryCatch(err) {
try {
protocols.throwError(err);
} catch (err_) {
return err_.message;
}
}
});
});
// Helpers.
// Message framing.
function frame(buf) {
var framed = new Buffer(buf.length + 4);
framed.writeInt32BE(buf.length);
buf.copy(framed, 4);
return framed;
}
function createReadableTransport(bufs, frameSize) {
return createReadableStream(bufs)
.pipe(new protocols.streams.MessageEncoder(frameSize || 64));
}
function createWritableTransport(bufs) {
var decoder = new protocols.streams.MessageDecoder();
decoder.pipe(createWritableStream(bufs));
return decoder;
}
function createTransport(readBufs, writeBufs) {
return toDuplex(
createReadableTransport(readBufs),
createWritableTransport(writeBufs)
);
}
function createPassthroughTransports() {
var pt1 = stream.PassThrough();
var pt2 = stream.PassThrough();
return [{readable: pt1, writable: pt2}, {readable: pt2, writable: pt1}];
}
// Simplified stream constructor API isn't available in earlier node versions.
function createReadableStream(bufs) {
var n = 0;
function Stream() { stream.Readable.call(this); }
util.inherits(Stream, stream.Readable);
Stream.prototype._read = function () {
this.push(bufs[n++] || null);
};
var readable = new Stream();
return readable;
}
function createWritableStream(bufs) {
function Stream() { stream.Writable.call(this); }
util.inherits(Stream, stream.Writable);
Stream.prototype._write = function (buf, encoding, cb) {
bufs.push(buf);
cb();
};
return new Stream();
}
// Combine two (binary) streams into a single duplex one. This is very basic
// and doesn't handle a lot of cases (e.g. where `_read` doesn't return
// something).
function toDuplex(readable, writable) {
function Stream() {
stream.Duplex.call(this);
this.on('finish', function () { writable.end(); });
}
util.inherits(Stream, stream.Duplex);
Stream.prototype._read = function () {
this.push(readable.read());
};
Stream.prototype._write = function (buf, encoding, cb) {
writable.write(buf);
cb();
};
return new Stream();
}