| /* jshint node: true */ |
| |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| 'use strict'; |
| |
| /** |
| * This module implements Avro's IPC/RPC logic. |
| * |
| * This is done the Node.js way, mimicking the `EventEmitter` class. |
| * |
| */ |
| |
| var schemas = require('./schemas'), |
| utils = require('./utils'), |
| events = require('events'), |
| stream = require('stream'), |
| util = require('util'); |
| |
| |
| var BOOLEAN_TYPE = schemas.createType('boolean'); |
| var STRING_TYPE = schemas.createType('string'); |
| var SYSTEM_ERROR_TYPE = schemas.createType(['string']); |
| |
| var HANDSHAKE_REQUEST_TYPE = schemas.createType({ |
| namespace: 'org.apache.avro.ipc', |
| name: 'HandshakeRequest', |
| type: 'record', |
| fields: [ |
| {name: 'clientHash', type: {name: 'MD5', type: 'fixed', size: 16}}, |
| {name: 'clientProtocol', type: ['null', 'string'], 'default': null}, |
| {name: 'serverHash', type: 'org.apache.avro.ipc.MD5'}, |
| { |
| name: 'meta', |
| type: ['null', {type: 'map', values: 'bytes'}], |
| 'default': null |
| } |
| ] |
| }); |
| |
| var HANDSHAKE_RESPONSE_TYPE = schemas.createType({ |
| namespace: 'org.apache.avro.ipc', |
| name: 'HandshakeResponse', |
| type: 'record', |
| fields: [ |
| { |
| name: 'match', |
| type: { |
| name: 'HandshakeMatch', |
| type: 'enum', |
| symbols: ['BOTH', 'CLIENT', 'NONE'] |
| } |
| }, |
| {name: 'serverProtocol', type: ['null', 'string'], 'default': null}, |
| { |
| name: 'serverHash', |
| type: ['null', {name: 'MD5', type: 'fixed', size: 16}], |
| 'default': null |
| }, |
| { |
| name: 'meta', |
| type: ['null', {type: 'map', values: 'bytes'}], |
| 'default': null |
| } |
| ] |
| }); |
| |
| var HandshakeRequest = HANDSHAKE_REQUEST_TYPE.getRecordConstructor(); |
| var HandshakeResponse = HANDSHAKE_RESPONSE_TYPE.getRecordConstructor(); |
| var Tap = utils.Tap; |
| var f = util.format; |
| |
| |
| /** |
| * Protocol generation function. |
| * |
| * This should be used instead of the protocol constructor. The protocol's |
| * constructor performs no logic to better support efficient protocol copy. |
| * |
| */ |
| function createProtocol(attrs, opts) { |
| opts = opts || {}; |
| |
| var name = attrs.protocol; |
| if (!name) { |
| throw new Error('missing protocol name'); |
| } |
| opts.namespace = attrs.namespace; |
| if (opts.namespace && !~name.indexOf('.')) { |
| name = f('%s.%s', opts.namespace, name); |
| } |
| |
| if (attrs.types) { |
| attrs.types.forEach(function (obj) { schemas.createType(obj, opts); }); |
| } |
| var messages = {}; |
| if (attrs.messages) { |
| Object.keys(attrs.messages).forEach(function (key) { |
| messages[key] = new Message(key, attrs.messages[key], opts); |
| }); |
| } |
| |
| return new Protocol(name, messages, opts.registry || {}); |
| } |
| |
| /** |
| * An Avro protocol. |
| * |
| * It contains a cache for all remote protocols encountered by its emitters and |
| * listeners. Note that a protocol can be listening to multiple listeners at a |
| * given time. This can be a mix of stateful or stateless listeners. |
| * |
| */ |
| function Protocol(name, messages, types, ptcl) { |
| this._name = name; |
| this._messages = messages; |
| this._types = types; |
| this._parent = ptcl; |
| |
| // Cache a string instead of the buffer to avoid retaining an entire slab. |
| this._hashString = utils.getHash(this.toString()).toString('binary'); |
| |
| // Listener callbacks. Note the prototype used for handlers when this is a |
| // subprotocol. This lets us easily implement the desired fallback behavior. |
| var self = this; |
| this._handlers = Object.create(ptcl ? ptcl._handlers : null); |
| this._onListenerCall = function (name, req, cb) { |
| var handler = self._handlers[name]; |
| if (!handler) { |
| cb(new Error(f('unsupported message: %s', name))); |
| } else { |
| handler.call(self, req, this, cb); |
| } |
| }; |
| |
| // Resolvers are split since we want emitters to still be able to talk to |
| // servers with more messages (which would be incompatible the other way). |
| this._emitterResolvers = ptcl ? ptcl._emitterResolvers : {}; |
| this._listenerResolvers = ptcl ? ptcl._listenerResolvers : {}; |
| } |
| |
| Protocol.prototype.subprotocol = function () { |
| return new Protocol(this._name, this._messages, this._types, this); |
| }; |
| |
| Protocol.prototype.emit = function (name, req, emitter, cb) { |
| cb = cb || throwError; // To provide a more helpful error message. |
| |
| if ( |
| !(emitter instanceof MessageEmitter) || |
| emitter._ptcl._hashString !== this._hashString |
| ) { |
| asyncAvroCb(this, cb, 'invalid emitter'); |
| return; |
| } |
| |
| var message = this._messages[name]; |
| if (!message) { |
| asyncAvroCb(this, cb, f('unknown message: %s', name)); |
| return; |
| } |
| |
| emitter._emit(message, req, cb); |
| }; |
| |
| Protocol.prototype.createEmitter = function (transport, opts, cb) { |
| if (!cb && typeof opts == 'function') { |
| cb = opts; |
| opts = undefined; |
| } |
| |
| var emitter; |
| if (typeof transport == 'function') { |
| emitter = new StatelessEmitter(this, transport, opts); |
| } else { |
| var readable, writable; |
| if (isStream(transport)) { |
| readable = writable = transport; |
| } else { |
| readable = transport.readable; |
| writable = transport.writable; |
| } |
| emitter = new StatefulEmitter(this, readable, writable, opts); |
| } |
| if (cb) { |
| emitter.once('eot', cb); |
| } |
| return emitter; |
| }; |
| |
| Protocol.prototype.on = function (name, handler) { |
| if (!this._messages[name]) { |
| throw new Error(f('unknown message: %s', name)); |
| } |
| this._handlers[name] = handler; |
| return this; |
| }; |
| |
| Protocol.prototype.createListener = function (transport, opts, cb) { |
| if (!cb && typeof opts == 'function') { |
| cb = opts; |
| opts = undefined; |
| } |
| |
| var listener; |
| if (typeof transport == 'function') { |
| listener = new StatelessListener(this, transport, opts); |
| } else { |
| var readable, writable; |
| if (isStream(transport)) { |
| readable = writable = transport; |
| } else { |
| readable = transport.readable; |
| writable = transport.writable; |
| } |
| listener = new StatefulListener(this, readable, writable, opts); |
| } |
| if (cb) { |
| listener.once('eot', cb); |
| } |
| return listener.on('_call', this._onListenerCall); |
| }; |
| |
| Protocol.prototype.getType = function (name) { return this._types[name]; }; |
| |
| Protocol.prototype.getName = function () { return this._name; }; |
| |
| Protocol.prototype.getMessages = function () { return this._messages; }; |
| |
| Protocol.prototype.toString = function () { |
| var namedTypes = []; |
| Object.keys(this._types).forEach(function (name) { |
| var type = this._types[name]; |
| if (type.getName()) { |
| namedTypes.push(type); |
| } |
| }, this); |
| |
| return schemas.stringify({ |
| protocol: this._name, |
| types: namedTypes.length ? namedTypes : undefined, |
| messages: this._messages |
| }); |
| }; |
| |
| Protocol.prototype.inspect = function () { |
| return f('<Protocol %j>', this._name); |
| }; |
| |
| /** |
| * Base message emitter class. |
| * |
| * See below for the two available variants. |
| * |
| */ |
| function MessageEmitter(ptcl, opts) { |
| events.EventEmitter.call(this); |
| |
| this._ptcl = ptcl; |
| this._resolvers = ptcl._emitterResolvers; |
| this._serverHashString = ptcl._hashString; |
| this._idType = IdType.createMetadataType(opts.IdType); |
| this._bufferSize = opts.bufferSize || 2048; |
| this._frameSize = opts.frameSize || 2048; |
| |
| this.once('_eot', function (pending) { this.emit('eot', pending); }); |
| } |
| util.inherits(MessageEmitter, events.EventEmitter); |
| |
| MessageEmitter.prototype._generateResolvers = function ( |
| hashString, serverPtcl |
| ) { |
| var resolvers = {}; |
| var emitterMessages = this._ptcl._messages; |
| var serverMessages = serverPtcl._messages; |
| Object.keys(emitterMessages).forEach(function (name) { |
| var cm = emitterMessages[name]; |
| var sm = serverMessages[name]; |
| if (!sm) { |
| throw new Error(f('missing server message: %s', name)); |
| } |
| resolvers[name] = { |
| responseType: cm.responseType.createResolver(sm.responseType), |
| errorType: cm.errorType.createResolver(sm.errorType) |
| }; |
| }); |
| this._resolvers[hashString] = resolvers; |
| }; |
| |
| MessageEmitter.prototype._createHandshakeRequest = function ( |
| hashString, noPtcl |
| ) { |
| return new HandshakeRequest( |
| getHash(this._ptcl), |
| noPtcl ? null : {string: this._ptcl.toString()}, |
| new Buffer(hashString, 'binary') |
| ); |
| }; |
| |
| MessageEmitter.prototype._finalizeHandshake = function (tap, handshakeReq) { |
| var res = HANDSHAKE_RESPONSE_TYPE._read(tap); |
| this.emit('handshake', handshakeReq, res); |
| |
| if (handshakeReq.clientProtocol && res.match === 'NONE') { |
| // If the emitter's protocol was included in the original request, this is |
| // not a failure which a retry will fix. |
| var buf = res.meta && res.meta.map.error; |
| throw new Error(buf ? buf.toString() : 'handshake error'); |
| } |
| |
| var hashString; |
| if (res.serverHash && res.serverProtocol) { |
| // This means the request didn't include the correct server hash. Note that |
| // we use the handshake response's hash rather than our computed one in |
| // case the server computes it differently. |
| hashString = res.serverHash['org.apache.avro.ipc.MD5'].toString('binary'); |
| if (!canResolve(this, hashString)) { |
| this._generateResolvers( |
| hashString, |
| createProtocol(JSON.parse(res.serverProtocol.string)) |
| ); |
| } |
| // Make this hash the new default. |
| this._serverHashString = hashString; |
| } else { |
| hashString = handshakeReq.serverHash.toString('binary'); |
| } |
| |
| // We return the server's hash for stateless emitters. It might be that the |
| // default hash changes in between requests, in which case using the default |
| // one will fail. |
| return {match: res.match, serverHashString: hashString}; |
| }; |
| |
| MessageEmitter.prototype._encodeRequest = function (tap, message, req) { |
| safeWrite(tap, STRING_TYPE, message.name); |
| safeWrite(tap, message.requestType, req); |
| }; |
| |
| MessageEmitter.prototype._decodeArguments = function ( |
| tap, hashString, message |
| ) { |
| var resolvers = getResolvers(this, hashString, message); |
| var args = [null, null]; |
| if (tap.readBoolean()) { |
| args[0] = resolvers.errorType._read(tap); |
| } else { |
| args[1] = resolvers.responseType._read(tap); |
| } |
| if (!tap.isValid()) { |
| throw new Error('truncated message'); |
| } |
| return args; |
| }; |
| |
| /** |
| * Factory-based emitter. |
| * |
| * This emitter doesn't keep a persistent connection to the server and requires |
| * prepending a handshake to each message emitted. Usage examples include |
| * talking to an HTTP server (where the factory returns an HTTP request). |
| * |
| * Since each message will use its own writable/readable stream pair, the |
| * advantage of this emitter is that it is able to keep track of which response |
| * corresponds to each request without relying on messages' metadata. In |
| * particular, this means these emitters are compatible with any server |
| * implementation. |
| * |
| */ |
| function StatelessEmitter(ptcl, writableFactory, opts) { |
| opts = opts || {}; |
| MessageEmitter.call(this, ptcl, opts); |
| |
| this._writableFactory = writableFactory; |
| this._id = 1; |
| this._pending = {}; |
| this._destroyed = false; |
| this._interrupted = false; |
| } |
| util.inherits(StatelessEmitter, MessageEmitter); |
| |
| StatelessEmitter.prototype._emit = function (message, req, cb) { |
| // We enclose the server's hash inside this message's closure since the |
| // emitter might be emitting several message concurrently and the hash might |
| // change before the response returns (unlikely but possible if the emitter |
| // talks to multiple servers at once or the server changes protocol). |
| var serverHashString = this._serverHashString; |
| var id = this._id++; |
| var self = this; |
| |
| this._pending[id] = cb; |
| if (this._destroyed) { |
| asyncAvroCb(undefined, done, 'emitter destroyed'); |
| return; |
| } |
| emit(false); |
| |
| function emit(retry) { |
| var tap = new Tap(new Buffer(self._bufferSize)); |
| |
| var handshakeReq = self._createHandshakeRequest(serverHashString, !retry); |
| safeWrite(tap, HANDSHAKE_REQUEST_TYPE, handshakeReq); |
| try { |
| safeWrite(tap, self._idType, id); |
| self._encodeRequest(tap, message, req); |
| } catch (err) { |
| asyncAvroCb(undefined, done, err); |
| return; |
| } |
| |
| var writable = self._writableFactory(function onReadable(readable) { |
| if (self._interrupted) { |
| // In case this function is called asynchronously (e.g. when sending |
| // HTTP requests), it might be that we have ended since. |
| return; |
| } |
| |
| readable |
| .pipe(new MessageDecoder(true)) |
| .on('error', done) |
| // This will happen when the readable stream ends before a single |
| // message has been decoded (e.g. on invalid response). |
| .on('data', function (buf) { |
| readable.unpipe(this); // Single message per readable stream. |
| if (self._interrupted) { |
| return; |
| } |
| |
| var tap = new Tap(buf); |
| try { |
| var info = self._finalizeHandshake(tap, handshakeReq); |
| serverHashString = info.serverHashString; |
| if (info.match === 'NONE') { |
| emit(true); // Retry, attaching emitter protocol this time. |
| return; |
| } |
| self._idType._read(tap); // Skip metadata. |
| var args = self._decodeArguments(tap, serverHashString, message); |
| } catch (err) { |
| done(err); |
| return; |
| } |
| done.apply(undefined, args); |
| }); |
| }); |
| |
| var encoder = new MessageEncoder(self._frameSize); |
| encoder.pipe(writable); |
| encoder.end(tap.getValue()); |
| } |
| |
| function done(err, res) { |
| var cb = self._pending[id]; |
| delete self._pending[id]; |
| cb.call(self._ptcl, err, res); |
| if (self._destroyed) { |
| self.destroy(); |
| } |
| } |
| }; |
| |
| StatelessEmitter.prototype.destroy = function (noWait) { |
| this._destroyed = true; |
| |
| var pendingIds = Object.keys(this._pending); |
| if (noWait) { |
| this._interrupted = true; |
| pendingIds.forEach(function (id) { |
| this._pending[id]({string: 'interrupted'}); |
| delete this._pending[id]; |
| }, this); |
| } |
| |
| if (noWait || !pendingIds.length) { |
| this.emit('_eot', pendingIds.length); |
| } |
| }; |
| |
| /** |
| * Multiplexing emitter. |
| * |
| * These emitters reuse the same streams (both readable and writable) for all |
| * messages. This avoids a lot of overhead (e.g. creating new connections, |
| * re-issuing handshakes) but requires the server to include compatible |
| * metadata in each response (namely forwarding each request's ID into its |
| * response). |
| * |
| * A custom metadata format can be specified via the `idType` option. The |
| * default is compatible with this package's default server (i.e. listener) |
| * implementation. |
| * |
| */ |
| function StatefulEmitter(ptcl, readable, writable, opts) { |
| opts = opts || {}; |
| MessageEmitter.call(this, ptcl, opts); |
| |
| this._readable = readable; |
| this._writable = writable; |
| this._id = 1; |
| this._pending = {}; |
| this._started = false; |
| this._destroyed = false; |
| this._ended = false; // Readable input ended. |
| this._decoder = new MessageDecoder(); |
| this._encoder = new MessageEncoder(this._frameSize); |
| |
| var handshakeReq = null; |
| var self = this; |
| |
| process.nextTick(function () { |
| self._readable.pipe(self._decoder) |
| .on('error', function (err) { self.emit('error', err); }) |
| .on('data', onHandshakeData) |
| .on('end', function () { |
| self._ended = true; |
| self.destroy(); |
| }); |
| |
| self._encoder.pipe(self._writable); |
| emitHandshake(true); |
| }); |
| |
| function emitHandshake(noPtcl) { |
| handshakeReq = self._createHandshakeRequest( |
| self._serverHashString, |
| noPtcl |
| ); |
| self._encoder.write(handshakeReq.$toBuffer()); |
| } |
| |
| function onHandshakeData(buf) { |
| var tap = new Tap(buf); |
| try { |
| var info = self._finalizeHandshake(tap, handshakeReq); |
| } catch (err) { |
| self.emit('error', err); |
| self.destroy(); // This isn't a recoverable error. |
| return; |
| } |
| |
| if (info.match !== 'NONE') { |
| self._decoder |
| .removeListener('data', onHandshakeData) |
| .on('data', onMessageData); |
| self._started = true; |
| self.emit('_start'); // Send any pending messages. |
| } else { |
| emitHandshake(false); |
| } |
| } |
| |
| function onMessageData(buf) { |
| var tap = new Tap(buf); |
| try { |
| var id = self._idType._read(tap); |
| if (!id) { |
| throw new Error('missing ID'); |
| } |
| } catch (err) { |
| self.emit('error', new Error('invalid metadata: ' + err.message)); |
| return; |
| } |
| |
| var info = self._pending[id]; |
| if (info === undefined) { |
| self.emit('error', new Error('orphan response: ' + id)); |
| return; |
| } |
| |
| try { |
| var args = self._decodeArguments( |
| tap, |
| self._serverHashString, |
| info.message |
| ); |
| } catch (err) { |
| info.cb({string: 'invalid response: ' + err.message}); |
| return; |
| } |
| delete self._pending[id]; |
| info.cb.apply(self._ptcl, args); |
| if (self._destroyed) { |
| self.destroy(); |
| } |
| } |
| } |
| util.inherits(StatefulEmitter, MessageEmitter); |
| |
| StatefulEmitter.prototype._emit = function (message, req, cb) { |
| if (this._destroyed) { |
| asyncAvroCb(this._ptcl, cb, 'emitter destroyed'); |
| return; |
| } |
| |
| var self = this; |
| if (!this._started) { |
| this.once('_start', function () { self._emit(message, req, cb); }); |
| return; |
| } |
| |
| var tap = new Tap(new Buffer(this._bufferSize)); |
| var id = this._id++; |
| try { |
| safeWrite(tap, this._idType, -id); |
| this._encodeRequest(tap, message, req); |
| } catch (err) { |
| asyncAvroCb(this._ptcl, cb, err); |
| return; |
| } |
| |
| if (!message.oneWay) { |
| this._pending[id] = {message: message, cb: cb}; |
| } |
| this._encoder.write(tap.getValue()); |
| }; |
| |
| StatefulEmitter.prototype.destroy = function (noWait) { |
| this._destroyed = true; |
| if (!this._started) { |
| this.emit('_start'); // Error out any pending calls. |
| } |
| |
| var pendingIds = Object.keys(this._pending); |
| if (pendingIds.length && !(noWait || this._ended)) { |
| return; // Wait for pending requests. |
| } |
| pendingIds.forEach(function (id) { |
| var cb = this._pending[id].cb; |
| delete this._pending[id]; |
| cb({string: 'interrupted'}); |
| }, this); |
| |
| this._readable.unpipe(this._decoder); |
| this._encoder.unpipe(this._writable); |
| this.emit('_eot', pendingIds.length); |
| }; |
| |
| /** |
| * The server-side emitter equivalent. |
| * |
| * In particular it is responsible for handling handshakes appropriately. |
| * |
| */ |
| function MessageListener(ptcl, opts) { |
| events.EventEmitter.call(this); |
| opts = opts || {}; |
| |
| this._ptcl = ptcl; |
| this._resolvers = ptcl._listenerResolvers; |
| this._emitterHashString = null; |
| this._idType = IdType.createMetadataType(opts.IdType); |
| this._bufferSize = opts.bufferSize || 2048; |
| this._frameSize = opts.frameSize || 2048; |
| this._decoder = new MessageDecoder(); |
| this._encoder = new MessageEncoder(this._frameSize); |
| this._destroyed = false; |
| this._pending = 0; |
| |
| this.once('_eot', function (pending) { this.emit('eot', pending); }); |
| } |
| util.inherits(MessageListener, events.EventEmitter); |
| |
| MessageListener.prototype._generateResolvers = function ( |
| hashString, emitterPtcl |
| ) { |
| var resolvers = {}; |
| var clientMessages = emitterPtcl._messages; |
| var serverMessages = this._ptcl._messages; |
| Object.keys(clientMessages).forEach(function (name) { |
| var sm = serverMessages[name]; |
| if (!sm) { |
| throw new Error(f('missing server message: %s', name)); |
| } |
| var cm = clientMessages[name]; |
| resolvers[name] = { |
| requestType: sm.requestType.createResolver(cm.requestType) |
| }; |
| }); |
| this._resolvers[hashString] = resolvers; |
| }; |
| |
| MessageListener.prototype._validateHandshake = function (reqTap, resTap) { |
| // Reads handshake request and write corresponding response out. If an error |
| // occurs when parsing the request, a response with match NONE will be sent. |
| // Also emits 'handshake' event with both the request and the response. |
| var validationErr = null; |
| try { |
| var handshakeReq = HANDSHAKE_REQUEST_TYPE._read(reqTap); |
| var serverHashString = handshakeReq.serverHash.toString('binary'); |
| } catch (err) { |
| validationErr = err; |
| } |
| |
| if (!validationErr) { |
| this._emitterHashString = handshakeReq.clientHash.toString('binary'); |
| if (!canResolve(this, this._emitterHashString)) { |
| var emitterPtclString = handshakeReq.clientProtocol; |
| if (emitterPtclString) { |
| try { |
| this._generateResolvers( |
| this._emitterHashString, |
| createProtocol(JSON.parse(emitterPtclString.string)) |
| ); |
| } catch (err) { |
| validationErr = err; |
| } |
| } else { |
| validationErr = new Error('unknown client protocol hash'); |
| } |
| } |
| } |
| |
| // We use the handshake response's meta field to transmit an eventual error |
| // to the client. This will let us display a more useful message later on. |
| var serverMatch = serverHashString === this._ptcl._hashString; |
| var handshakeRes = new HandshakeResponse( |
| validationErr ? 'NONE' : serverMatch ? 'BOTH' : 'CLIENT', |
| serverMatch ? null : {string: this._ptcl.toString()}, |
| serverMatch ? null : {'org.apache.avro.ipc.MD5': getHash(this._ptcl)}, |
| validationErr ? {map: {error: new Buffer(validationErr.message)}} : null |
| ); |
| |
| this.emit('handshake', handshakeReq, handshakeRes); |
| safeWrite(resTap, HANDSHAKE_RESPONSE_TYPE, handshakeRes); |
| return validationErr === null; |
| }; |
| |
| MessageListener.prototype._decodeRequest = function (tap, message) { |
| var resolvers = getResolvers(this, this._emitterHashString, message); |
| var val = resolvers.requestType._read(tap); |
| if (!tap.isValid()) { |
| throw new Error('invalid request'); |
| } |
| return val; |
| }; |
| |
| MessageListener.prototype._encodeSystemError = function (tap, err) { |
| safeWrite(tap, BOOLEAN_TYPE, true); |
| safeWrite(tap, SYSTEM_ERROR_TYPE, avroError(err)); |
| }; |
| |
| MessageListener.prototype._encodeArguments = function ( |
| tap, message, err, res |
| ) { |
| var noError = err === null; |
| var pos = tap.pos; |
| safeWrite(tap, BOOLEAN_TYPE, !noError); |
| try { |
| if (noError) { |
| safeWrite(tap, message.responseType, res); |
| } else { |
| if (err instanceof Error) { |
| // Convenience to allow emitter to use JS errors inside handlers. |
| err = avroError(err); |
| } |
| safeWrite(tap, message.errorType, err); |
| } |
| } catch (err) { |
| tap.pos = pos; |
| this._encodeSystemError(tap, err); |
| } |
| }; |
| |
| MessageListener.prototype.destroy = function (noWait) { |
| if (!this._destroyed) { |
| // Stop listening. This will also correctly push back any unused bytes into |
| // the readable stream (via `MessageDecoder`'s `unpipe` handler). |
| this._readable.unpipe(this._decoder); |
| } |
| |
| this._destroyed = true; |
| if (noWait || !this._pending) { |
| this._encoder.unpipe(this._writable); |
| this.emit('_eot', this._pending); |
| } |
| }; |
| |
| /** |
| * Listener for stateless transport. |
| * |
| * This listener expect a handshake to precede each message. |
| * |
| */ |
| function StatelessListener(ptcl, readableFactory, opts) { |
| MessageListener.call(this, ptcl, opts); |
| |
| this._tap = new Tap(new Buffer(this._bufferSize)); |
| this._message = undefined; |
| |
| var self = this; |
| this._readable = readableFactory(function (writable) { |
| // The encoder will buffer writes that happen before this function is |
| // called, so we don't need to do any special handling. |
| self._writable = self._encoder |
| .pipe(writable) |
| .on('finish', onEnd); |
| }); |
| |
| this._readable.pipe(this._decoder) |
| .on('data', onRequestData) |
| .on('end', onEnd); |
| |
| function onRequestData(buf) { |
| self._pending++; |
| self.destroy(); // Only one message per stateless listener. |
| |
| var reqTap = new Tap(buf); |
| if (!self._validateHandshake(reqTap, self._tap)) { |
| onResponse(new Error('invalid handshake')); |
| return; |
| } |
| |
| try { |
| self._idType._read(reqTap); // Skip metadata. |
| var name = STRING_TYPE._read(reqTap); |
| self._message = self._ptcl._messages[name]; |
| if (!self._message) { |
| throw new Error(f('unknown message: %s', name)); |
| } |
| var req = self._decodeRequest(reqTap, self._message); |
| } catch (err) { |
| onResponse(err); |
| return; |
| } |
| |
| self.emit('_call', name, req, onResponse); |
| } |
| |
| function onResponse(err, res) { |
| safeWrite(self._tap, self._idType, 0); |
| if (!self._message) { |
| self._encodeSystemError(self._tap, err); |
| } else { |
| self._encodeArguments(self._tap, self._message, err, res); |
| } |
| self._pending--; |
| self._encoder.end(self._tap.getValue()); |
| } |
| |
| function onEnd() { self.destroy(); } |
| } |
| util.inherits(StatelessListener, MessageListener); |
| |
| /** |
| * Stateful transport listener. |
| * |
| * A handshake is done when the listener is first opened, then all messages are |
| * sent without. |
| * |
| */ |
| function StatefulListener(ptcl, readable, writable, opts) { |
| MessageListener.call(this, ptcl, opts); |
| |
| this._readable = readable; |
| this._writable = writable; |
| |
| var self = this; |
| |
| this._readable |
| .pipe(this._decoder) |
| .on('data', onHandshakeData) |
| .on('end', function () { self.destroy(); }); |
| |
| this._encoder |
| .pipe(this._writable) |
| .on('finish', function () { self.destroy(); }); |
| |
| function onHandshakeData(buf) { |
| var reqTap = new Tap(buf); |
| var resTap = new Tap(new Buffer(self._bufferSize)); |
| if (self._validateHandshake(reqTap, resTap)) { |
| self._decoder |
| .removeListener('data', onHandshakeData) |
| .on('data', onRequestData); |
| } |
| self._encoder.write(resTap.getValue()); |
| } |
| |
| function onRequestData(buf) { |
| var reqTap = new Tap(buf); |
| var resTap = new Tap(new Buffer(self._bufferSize)); |
| var id = 0; |
| try { |
| id = -self._idType._read(reqTap) | 0; |
| if (!id) { |
| throw new Error('missing ID'); |
| } |
| } catch (err) { |
| self.emit('error', new Error('invalid metadata: ' + err.message)); |
| return; |
| } |
| |
| self._pending++; |
| try { |
| var name = STRING_TYPE._read(reqTap); |
| var message = self._ptcl._messages[name]; |
| if (!message) { |
| throw new Error('unknown message: ' + name); |
| } |
| var req = self._decodeRequest(reqTap, message); |
| } catch (err) { |
| onResponse(err); |
| return; |
| } |
| |
| if (message.oneWay) { |
| self.emit('_call', name, req); |
| self._pending--; |
| } else { |
| self.emit('_call', name, req, onResponse); |
| } |
| |
| function onResponse(err, res) { |
| self._pending--; |
| safeWrite(resTap, self._idType, id); |
| if (!message) { |
| self._encodeSystemError(resTap, err); |
| } else { |
| self._encodeArguments(resTap, message, err, res); |
| } |
| self._encoder.write(resTap.getValue(), undefined, function () { |
| if (!self._pending && self._destroyed) { |
| self.destroy(); // For real this time. |
| } |
| }); |
| } |
| } |
| } |
| util.inherits(StatefulListener, MessageListener); |
| |
| // Helpers. |
| |
| /** |
| * An Avro message. |
| * |
| */ |
| function Message(name, attrs, opts) { |
| this.name = name; |
| |
| this.requestType = schemas.createType({ |
| name: name, |
| type: 'request', |
| fields: attrs.request |
| }, opts); |
| |
| if (!attrs.response) { |
| throw new Error('missing response'); |
| } |
| this.responseType = schemas.createType(attrs.response, opts); |
| |
| var errors = attrs.errors || []; |
| errors.unshift('string'); |
| this.errorType = schemas.createType(errors, opts); |
| |
| this.oneWay = !!attrs['one-way']; |
| if (this.oneWay) { |
| if ( |
| !(this.responseType instanceof schemas.types.NullType) || |
| errors.length > 1 |
| ) { |
| throw new Error('unapplicable one-way parameter'); |
| } |
| } |
| } |
| |
| Message.prototype.toJSON = function () { |
| var obj = { |
| request: this.requestType.getFields(), |
| response: this.responseType |
| }; |
| var errorTypes = this.errorType.getTypes(); |
| if (errorTypes.length > 1) { |
| obj.errors = schemas.createType(errorTypes.slice(1)); |
| } |
| return obj; |
| }; |
| |
| /** |
| * "Framing" stream. |
| * |
| * @param frameSize {Number} (Maximum) size in bytes of each frame. The last |
| * frame might be shorter. |
| * |
| */ |
| function MessageEncoder(frameSize) { |
| stream.Transform.call(this); |
| this._frameSize = frameSize | 0; |
| if (this._frameSize <= 0) { |
| throw new Error('invalid frame size'); |
| } |
| } |
| util.inherits(MessageEncoder, stream.Transform); |
| |
| MessageEncoder.prototype._transform = function (buf, encoding, cb) { |
| var frames = []; |
| var length = buf.length; |
| var start = 0; |
| var end; |
| do { |
| end = start + this._frameSize; |
| if (end > length) { |
| end = length; |
| } |
| frames.push(intBuffer(end - start)); |
| frames.push(buf.slice(start, end)); |
| } while ((start = end) < length); |
| frames.push(intBuffer(0)); |
| cb(null, Buffer.concat(frames)); |
| }; |
| |
| /** |
| * "Un-framing" stream. |
| * |
| * @param noEmpty {Boolean} Emit an error if the decoder ends before emitting a |
| * single frame. |
| * |
| * This stream should only be used by being piped/unpiped to. Otherwise there |
| * is a risk that too many bytes get consumed from the source stream (i.e. |
| * data corresponding to a partial message might be lost). |
| * |
| */ |
| function MessageDecoder(noEmpty) { |
| stream.Transform.call(this); |
| this._buf = new Buffer(0); |
| this._bufs = []; |
| this._length = 0; |
| this._empty = !!noEmpty; |
| |
| this |
| .on('finish', function () { this.push(null); }) |
| .on('unpipe', function (src) { |
| if (~this._length && !src._readableState.ended) { |
| // Not ideal to rely on this to check whether we can unshift, but the |
| // official documentation mentions it (in the context of the read |
| // buffers) so it should be stable. Alternatives are more complex, |
| // costly (e.g. attaching a handler on pipe), and not as fool-proof |
| // (the stream might have ended earlier). |
| this._bufs.push(this._buf); |
| src.unshift(Buffer.concat(this._bufs)); |
| } |
| }); |
| } |
| util.inherits(MessageDecoder, stream.Transform); |
| |
| MessageDecoder.prototype._transform = function (buf, encoding, cb) { |
| buf = Buffer.concat([this._buf, buf]); |
| var frameLength; |
| while ( |
| buf.length >= 4 && |
| buf.length >= (frameLength = buf.readInt32BE(0)) + 4 |
| ) { |
| if (frameLength) { |
| this._bufs.push(buf.slice(4, frameLength + 4)); |
| this._length += frameLength; |
| } else { |
| var frame = Buffer.concat(this._bufs, this._length); |
| this._empty = false; |
| this._length = 0; |
| this._bufs = []; |
| this.push(frame); |
| } |
| buf = buf.slice(frameLength + 4); |
| } |
| this._buf = buf; |
| cb(); |
| }; |
| |
| MessageDecoder.prototype._flush = function () { |
| if (this._length || this._buf.length) { |
| this._length = -1; // Don't unshift data on incoming unpipe. |
| this.emit('error', new Error('trailing data')); |
| } else if (this._empty) { |
| this.emit('error', new Error('no message decoded')); |
| } |
| }; |
| |
| /** |
| * Default ID generator, using Avro messages' metadata field. |
| * |
| * This is required for stateful emitters to work and can be overridden to read |
| * or write arbitrary metadata. Note that the message contents are |
| * (intentionally) not available when updating this metadata. |
| * |
| */ |
| function IdType(attrs, opts) { |
| schemas.types.LogicalType.call(this, attrs, opts); |
| } |
| util.inherits(IdType, schemas.types.LogicalType); |
| |
| IdType.prototype._fromValue = function (val) { |
| var buf = val.id; |
| return buf && buf.length === 4 ? buf.readInt32BE(0) : 0; |
| }; |
| |
| IdType.prototype._toValue = function (any) { |
| return {id: intBuffer(any | 0)}; |
| }; |
| |
| IdType.createMetadataType = function (Type) { |
| Type = Type || IdType; |
| return new Type({type: 'map', values: 'bytes'}); |
| }; |
| |
| /** |
| * Returns a buffer containing an integer's big-endian representation. |
| * |
| * @param n {Number} Integer. |
| * |
| */ |
| function intBuffer(n) { |
| var buf = new Buffer(4); |
| buf.writeInt32BE(n); |
| return buf; |
| } |
| |
| /** |
| * Write and maybe resize. |
| * |
| * @param tap {Tap} Tap written to. |
| * @param type {Type} Avro type. |
| * @param val {...} Corresponding Avro value. |
| * |
| */ |
| function safeWrite(tap, type, val) { |
| var pos = tap.pos; |
| type._write(tap, val); |
| |
| if (!tap.isValid()) { |
| var buf = new Buffer(tap.pos); |
| tap.buf.copy(buf, 0, 0, pos); |
| tap.buf = buf; |
| tap.pos = pos; |
| type._write(tap, val); |
| } |
| } |
| |
| /** |
| * Default callback when not provided. |
| * |
| */ |
| function throwError(err) { |
| if (!err) { |
| return; |
| } |
| if (typeof err == 'object' && err.string) { |
| err = err.string; |
| } |
| if (typeof err == 'string') { |
| err = new Error(err); |
| } |
| throw err; |
| } |
| |
| /** |
| * Convert an error message into a format suitable for RPC. |
| * |
| * @param err {Error|String} Error message. It will be converted into valid |
| * format for Avro. |
| * |
| */ |
| function avroError(err) { |
| if (err instanceof Error) { |
| err = err.message; |
| } |
| return {string: err}; |
| } |
| |
| /** |
| * Asynchronous error handling. |
| * |
| * @param cb {Function} Callback. |
| * @param err {...} Error, passed as first argument to `cb.` If an `Error` |
| * instance or a string, it will be converted into valid format for Avro. |
| * @param res {...} Response. Passed as second argument to `cb`. |
| * |
| */ |
| function asyncAvroCb(ctx, cb, err, res) { |
| process.nextTick(function () { cb.call(ctx, avroError(err), res); }); |
| } |
| |
| /** |
| * Convenience function to get a protocol's hash. |
| * |
| * @param ptcl {Protocol} Any protocol. |
| * |
| */ |
| function getHash(ptcl) { |
| return new Buffer(ptcl._hashString, 'binary'); |
| } |
| |
| /** |
| * Whether a emitter or listener can resolve messages from a hash string. |
| * |
| * @param emitter {MessageEmitter|MessageListener} |
| * @param hashString {String} |
| * |
| */ |
| function canResolve(emitter, hashString) { |
| var resolvers = emitter._resolvers[hashString]; |
| return !!resolvers || hashString === emitter._ptcl._hashString; |
| } |
| |
| /** |
| * Retrieve resolvers for a given hash string. |
| * |
| * @param emitter {MessageEmitter|MessageListener} |
| * @param hashString {String} |
| * @param message {Message} |
| * |
| */ |
| function getResolvers(emitter, hashString, message) { |
| if (hashString === emitter._ptcl._hashString) { |
| return message; |
| } |
| var resolvers = emitter._resolvers[hashString]; |
| return resolvers && resolvers[message.name]; |
| } |
| |
| /** |
| * Check whether something is a stream. |
| * |
| * @param any {Object} Any object. |
| * |
| */ |
| function isStream(any) { |
| // This is a hacky way of checking that the transport is a stream-like |
| // object. We unfortunately can't use `instanceof Stream` checks since |
| // some libraries (e.g. websocket-stream) return streams which don't |
| // inherit from it. |
| return !!any.pipe; |
| } |
| |
| |
| module.exports = { |
| HANDSHAKE_REQUEST_TYPE: HANDSHAKE_REQUEST_TYPE, |
| HANDSHAKE_RESPONSE_TYPE: HANDSHAKE_RESPONSE_TYPE, |
| IdType: IdType, |
| Message: Message, |
| Protocol: Protocol, |
| createProtocol: createProtocol, |
| emitters: { |
| StatefulEmitter: StatefulEmitter, |
| StatelessEmitter: StatelessEmitter |
| }, |
| listeners: { |
| StatefulListener: StatefulListener, |
| StatelessListener: StatelessListener |
| }, |
| streams: { |
| MessageDecoder: MessageDecoder, |
| MessageEncoder: MessageEncoder |
| }, |
| throwError: throwError |
| }; |