blob: df90c7d150104077f04712cac17e950d3b107abc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const assert = require('assert');
const events = require('events');
const Encoder = require('../../lib/encoder');
const streams = require('../../lib/streams');
const errors = require('../../lib/errors');
const types = require('../../lib/types');
const utils = require('../../lib/utils');
const helper = require('../test-helper');
/**
* Tests for the transform streams that are involved in the reading of a response
*/
describe('Parser', function () {
describe('#_transform()', function () {
it('should read a READY opcode', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.bodyLength, 0);
assert.strictEqual(item.header.opcode, types.opcodes.ready);
done();
});
parser._transform({header: getFrameHeader(0, types.opcodes.ready), chunk: utils.allocBufferFromArray([])}, null, doneIfError(done));
});
it('should read a AUTHENTICATE response', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.authenticate);
assert.ok(item.mustAuthenticate, 'it should return a mustAuthenticate return flag');
done();
});
parser._transform({ header: getFrameHeader(2, types.opcodes.authenticate), chunk: utils.allocBufferFromArray([0, 0])}, null, doneIfError(done));
});
it('should buffer a AUTHENTICATE response until complete', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.authenticate);
assert.ok(item.mustAuthenticate, 'it should return a mustAuthenticate return flag');
assert.strictEqual(item.authenticatorName, 'abc');
//mocha will fail if done is called multiple times
done();
});
const header = getFrameHeader(5, types.opcodes.authenticate);
parser._transform({ header: header, chunk: utils.allocBufferFromArray([0, 3]), offset: 0}, null, doneIfError(done));
parser._transform({ header: header, chunk: utils.allocBufferFromString('a'), offset: 0}, null, doneIfError(done));
parser._transform({ header: header, chunk: utils.allocBufferFromString('bc'), offset: 0}, null, doneIfError(done));
});
it('should read a VOID result', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.bodyLength, 4);
assert.strictEqual(item.header.opcode, types.opcodes.result);
done();
});
parser._transform({
header: getFrameHeader(4, types.opcodes.result),
chunk: utils.allocBufferFromArray([0, 0, 0, types.resultKind.voidResult])
}, null, doneIfError(done));
});
it('should read a VOID result with trace id', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.bodyLength, 4);
assert.strictEqual(item.header.opcode, types.opcodes.result);
helper.assertInstanceOf(item.flags.traceId, types.Uuid);
done();
});
parser._transform({
header: getFrameHeader(4, types.opcodes.result, 2, true),
chunk: Buffer.concat([
utils.allocBufferUnsafe(16), //uuid
utils.allocBufferFromArray([0, 0, 0, types.resultKind.voidResult])
])
}, null, doneIfError(done));
});
it('should read a VOID result with trace id chunked', function (done) {
const parser = newInstance();
let responseCounter = 0;
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.result);
responseCounter++;
});
const body = Buffer.concat([
utils.allocBufferUnsafe(16), //uuid
utils.allocBufferFromArray([0, 0, 0, types.resultKind.voidResult])
]);
parser._transform({
header: getFrameHeader(4, types.opcodes.result, 2, true),
chunk: body
}, null, doneIfError(done));
process.nextTick(() => {
assert.strictEqual(responseCounter, 1);
parser.setOptions(88, { byRow: true });
for (let i = 0; i < body.length; i++) {
parser._transform({
header: getFrameHeader(4, types.opcodes.result, 2, true, 88),
chunk: body.slice(i, i + 1),
offset: 0
}, null, doneIfError(done));
}
process.nextTick(() => {
assert.strictEqual(responseCounter, 2);
done();
});
});
});
it('should read a RESULT result with trace id chunked', function (done) {
const parser = newInstance();
let responseCounter = 0;
let byRowCompleted = false;
parser.on('readable', function () {
let item;
while ((item = parser.read())) {
if (!item.row && item.frameEnded) {
continue;
}
assert.strictEqual(item.header.opcode, types.opcodes.result);
byRowCompleted = item.byRowCompleted;
responseCounter++;
}
});
const body = Buffer.concat([
utils.allocBufferUnsafe(16), //uuid
getBodyChunks(3, 1, 0, undefined, null).chunk
]);
parser._transform({
header: getFrameHeader(body.length, types.opcodes.result, 2, true),
chunk: body,
offset: 0
}, null, doneIfError(done));
process.nextTick(() => {
assert.strictEqual(responseCounter, 1);
assert.notEqual(byRowCompleted, true);
parser.setOptions(88, { byRow: true });
for (let i = 0; i < body.length; i++) {
parser._transform({
header: getFrameHeader(4, types.opcodes.result, 2, true, 88),
chunk: body.slice(i, i + 1),
offset: 0
}, null, doneIfError(done));
}
process.nextTick(() => {
assert.strictEqual(responseCounter, 3);
assert.ok(byRowCompleted);
done();
});
});
});
it('should read a VOID result with warnings and custom payload', function (done) {
const parser = newInstance();
const body = Buffer.concat([
// 2 string list of warnings containing 'Hello', 'World'
utils.allocBufferFromString('0002000548656c6c6f0005576f726c64', 'hex'),
// Custom payload byte map of {a: 1, b: 2}
utils.allocBufferFromString('000200016100000001010001620000000102', 'hex'),
// void result indicator
utils.allocBufferFromArray([0, 0, 0, types.resultKind.voidResult])
]);
parser.on('readable', function () {
const item = parser.read();
assert.ok(!item.error);
assert.strictEqual(item.header.bodyLength, body.length);
assert.strictEqual(item.header.opcode, types.opcodes.result);
assert.ok(item.flags);
assert.ok(item.flags.warnings);
assert.deepEqual(item.flags.warnings, ['Hello', 'World']);
assert.ok(item.flags.customPayload);
assert.deepEqual(item.flags.customPayload, {a: utils.allocBufferFromArray([0x01]), b: utils.allocBufferFromArray([0x02])});
done();
});
const header = getFrameHeader(body.length, types.opcodes.result, 4, false, 12, true, true);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read a SET_KEYSPACE result', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.result);
assert.strictEqual(item.keyspaceSet, 'ks1');
done();
});
//kind + stringLength + string
const bodyLength = 4 + 2 + 3;
parser._transform({
header: getFrameHeader(bodyLength, types.opcodes.result),
chunk: utils.allocBufferFromArray([0, 0, 0, types.resultKind.setKeyspace]),
offset: 0
}, null, doneIfError(done));
parser._transform({
header: getFrameHeader(bodyLength, types.opcodes.result),
chunk: utils.allocBufferFromArray([0, 3]),
offset: 0
}, null, doneIfError(done));
parser._transform({
header: getFrameHeader(bodyLength, types.opcodes.result),
chunk: utils.allocBufferFromString('ks1'),
offset: 0
}, null, doneIfError(done));
});
it('should read a PREPARE result', function (done) {
const version = types.protocolVersion.dseV2;
const parser = newInstance(version);
const id = types.Uuid.random();
const resultMetaId = types.Uuid.random();
parser.on('readable', function () {
const item = parser.read();
assert.ifError(item.error);
assert.strictEqual(item.header.opcode, types.opcodes.result);
helper.assertInstanceOf(item.id, Buffer);
assert.strictEqual(item.id.toString('hex'), id.getBuffer().toString('hex'));
helper.assertInstanceOf(item.meta.resultId, Buffer);
assert.strictEqual(item.meta.resultId.toString('hex'), resultMetaId.getBuffer().toString('hex'));
assert.deepEqual(item.meta.partitionKeys, [4]);
done();
});
//kind +
// id length + id
// meta id length + id
// metadata (flags + columnLength + partitionKeyLength + partition key index + ksname + tblname + column name + column type) +
// result metadata (flags + columnLength + ksname + tblname + column name + column type)
const body = Buffer.concat([
utils.allocBufferFromArray([0, 0, 0, types.resultKind.prepared]),
utils.allocBufferFromArray([0, 16]),
id.getBuffer(),
utils.allocBufferFromArray([0, 16]),
resultMetaId.getBuffer(),
utils.allocBufferFromArray([0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 4, 0, 1, 62, 0, 1, 63, 0, 1, 61, 0, types.dataTypes.text]),
utils.allocBufferFromArray([0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 62, 0, 1, 63, 0, 1, 61, 0, types.dataTypes.text])
]);
const bodyLength = body.length;
parser._transform({
header: getFrameHeader(bodyLength, types.opcodes.result, version),
chunk: body.slice(0, 22),
offset: 0
}, null, doneIfError(done));
parser._transform({
header: getFrameHeader(bodyLength, types.opcodes.result, version),
chunk: body.slice(22, 41),
offset: 0
}, null, doneIfError(done));
parser._transform({
header: getFrameHeader(bodyLength, types.opcodes.result, version),
chunk: body.slice(41),
offset: 0
}, null, doneIfError(done));
});
it('should read a PREPARE V2 result', function (done) {
const parser = newInstance();
const id = types.Uuid.random();
parser.on('readable', function () {
const item = parser.read();
assert.ifError(item.error);
assert.strictEqual(item.header.opcode, types.opcodes.result);
helper.assertInstanceOf(item.id, Buffer);
assert.strictEqual(item.id.toString('hex'), id.getBuffer().toString('hex'));
done();
});
//kind +
// id length + id
// metadata (flags + columnLength + ksname + tblname + column name + column type) +
// result metadata (flags + columnLength + ksname + tblname + column name + column type)
const body = Buffer.concat([
utils.allocBufferFromArray([0, 0, 0, types.resultKind.prepared]),
utils.allocBufferFromArray([0, 16]),
id.getBuffer(),
utils.allocBufferFromArray([0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 62, 0, 1, 63, 0, 1, 61, 0, types.dataTypes.text]),
utils.allocBufferFromArray([0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 62, 0, 1, 63, 0, 1, 61, 0, types.dataTypes.text])
]);
const bodyLength = body.length;
parser._transform({
header: getFrameHeader(bodyLength, types.opcodes.result),
chunk: body.slice(0, 22),
offset: 0
}, null, doneIfError(done));
parser._transform({
header: getFrameHeader(bodyLength, types.opcodes.result),
chunk: body.slice(22, 41),
offset: 0
}, null, doneIfError(done));
parser._transform({
header: getFrameHeader(bodyLength, types.opcodes.result),
chunk: body.slice(41),
offset: 0
}, null, doneIfError(done));
});
it('should read a STATUS_CHANGE UP EVENT response', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.event);
assert.ok(item.event, 'it should return the details of the event');
assert.strictEqual(item.event.up, true);
done();
});
const eventData = getEventData('STATUS_CHANGE', 'UP');
parser._transform(eventData, null, doneIfError(done));
});
it('should read a STATUS_CHANGE DOWN EVENT response', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.event);
assert.ok(item.event, 'it should return the details of the event');
assert.strictEqual(item.event.up, false);
done();
});
const eventData = getEventData('STATUS_CHANGE', 'DOWN');
parser._transform(eventData, null, doneIfError(done));
});
it('should read a STATUS_CHANGE DOWN EVENT response chunked', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.event);
assert.ok(item.event, 'it should return the details of the event');
assert.strictEqual(item.event.up, false);
done();
});
const eventData = getEventData('STATUS_CHANGE', 'DOWN');
const chunk1 = eventData.chunk.slice(0, 5);
const chunk2 = eventData.chunk.slice(5);
parser._transform({header: eventData.header, chunk: chunk1, offset: 0}, null, doneIfError(done));
parser._transform({header: eventData.header, chunk: chunk2, offset: 0}, null, doneIfError(done));
});
it('should read an ERROR response that includes warnings', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.error);
assert.ok(item.error);
helper.assertInstanceOf(item.error, errors.ResponseError);
assert.strictEqual(item.error.message, "Fail");
assert.strictEqual(item.error.code, 0); // Server Error
done();
});
const body = Buffer.concat([
utils.allocBufferFromString('0002000548656c6c6f0005576f726c64', 'hex'), // 2 string list of warnings containing 'Hello', 'World'
utils.allocBufferFromString('0000000000044661696c', 'hex') // Server Error Code (0x0000) with 4 length message 'Fail'
]);
const bodyLength = body.length;
const header = getFrameHeader(bodyLength, types.opcodes.error, 4, false, 12, true);
parser._transform({
header: header,
chunk: body.slice(0, 4),
offset: 0
}, null, doneIfError(done));
parser._transform({
header: header,
chunk: body.slice(4, 10),
offset: 0
}, null, doneIfError(done));
parser._transform({
header: header,
chunk: body.slice(10),
offset: 0
}, null, doneIfError(done));
});
it('should read an UNAVAILABLE', function (done) {
const parser = buildParserAndExpect(function (msg) {
assert.ok(msg.error);
helper.assertInstanceOf(msg.error, errors.ResponseError);
assert.strictEqual(msg.error.code, types.responseErrorCodes.unavailableException);
assert.strictEqual(msg.error.consistencies, types.consistencies.localQuorum);
assert.strictEqual(msg.error.required, 5);
assert.strictEqual(msg.error.alive, 4);
assert.strictEqual(msg.error.message, 'Not enough replicas available for query at consistency LOCAL_QUORUM (5 required but only 4 alive)');
done();
});
// Unavailable at LOCAL_QUORUM with 5 required and 4 alive.
const bodyArray = [];
// Unavailable (0x1000)
bodyArray.push(utils.allocBufferFromArray([0, 0, 0x10, 0]));
// No Message
bodyArray.push(utils.allocBufferFromArray([0, 0]));
// LOCAL_QUORUM, with 5 required and 4 alive.
bodyArray.push(utils.allocBufferFromArray([0, 6, 0, 0, 0, 5, 0, 0, 0, 4]));
const body = Buffer.concat(bodyArray);
const header = getFrameHeader(body.length, types.opcodes.error, 4);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read a READ_TIMEOUT with not enough received', function (done) {
const parser = buildParserAndExpect(function (msg) {
assert.ok(msg.error);
helper.assertInstanceOf(msg.error, errors.ResponseError);
assert.strictEqual(msg.error.code, types.responseErrorCodes.readTimeout);
assert.strictEqual(msg.error.consistencies, types.consistencies.two);
assert.strictEqual(msg.error.received, 1);
assert.strictEqual(msg.error.blockFor, 2);
assert.strictEqual(msg.error.isDataPresent, 0);
assert.strictEqual(msg.error.message, 'Server timeout during read query at consistency TWO (1 replica(s) responded over 2 required)');
done();
});
// Read Timeout at TWO with 1 received 2 block for and no data present
const bodyArray = [];
// Read Timeout (0x1200)
bodyArray.push(utils.allocBufferFromArray([0, 0, 0x12, 0]));
// No Message
bodyArray.push(utils.allocBufferFromArray([0, 0]));
// TWO, 1 received, 2 block for, no data
bodyArray.push(utils.allocBufferFromArray([0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0]));
const body = Buffer.concat(bodyArray);
const header = getFrameHeader(body.length, types.opcodes.error, 4);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read a READ_TIMEOUT with no data present', function (done) {
const parser = buildParserAndExpect(function (msg) {
assert.ok(msg.error);
helper.assertInstanceOf(msg.error, errors.ResponseError);
assert.strictEqual(msg.error.code, types.responseErrorCodes.readTimeout);
assert.strictEqual(msg.error.consistencies, types.consistencies.two);
assert.strictEqual(msg.error.received, 2);
assert.strictEqual(msg.error.blockFor, 2);
assert.strictEqual(msg.error.isDataPresent, 0);
assert.strictEqual(msg.error.message, 'Server timeout during read query at consistency TWO (the replica queried for the data didn\'t respond)');
done();
});
// Read Timeout at TWO with 2 received 2 block for and no data present
const bodyArray = [];
// Read Timeout (0x1200)
bodyArray.push(utils.allocBufferFromArray([0, 0, 0x12, 0]));
// No Message
bodyArray.push(utils.allocBufferFromArray([0, 0]));
// TWO, 2 received, 2 block for, no data
bodyArray.push(utils.allocBufferFromArray([0, 2, 0, 0, 0, 2, 0, 0, 0, 2, 0]));
const body = Buffer.concat(bodyArray);
const header = getFrameHeader(body.length, types.opcodes.error, 4);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read a READ_TIMEOUT with repair timeout', function (done) {
const parser = buildParserAndExpect(function (msg) {
assert.ok(msg.error);
helper.assertInstanceOf(msg.error, errors.ResponseError);
assert.strictEqual(msg.error.code, types.responseErrorCodes.readTimeout);
assert.strictEqual(msg.error.consistencies, types.consistencies.two);
assert.strictEqual(msg.error.received, 2);
assert.strictEqual(msg.error.blockFor, 2);
assert.strictEqual(msg.error.isDataPresent, 1);
assert.strictEqual(msg.error.message, 'Server timeout during read query at consistency TWO (timeout while waiting for repair of inconsistent replica)');
done();
});
// Read Timeout at TWO with 2 received 2 block for and no data present
const bodyArray = [];
// Read Timeout (0x1200)
bodyArray.push(utils.allocBufferFromArray([0, 0, 0x12, 0]));
// No Message
bodyArray.push(utils.allocBufferFromArray([0, 0]));
// TWO, 2 received, 2 block for, data present
bodyArray.push(utils.allocBufferFromArray([0, 2, 0, 0, 0, 2, 0, 0, 0, 2, 1]));
const body = Buffer.concat(bodyArray);
const header = getFrameHeader(body.length, types.opcodes.error, 4);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read a READ_FAILURE', function (done) {
const parser = buildParserAndExpect(function (msg) {
assert.ok(msg.error);
helper.assertInstanceOf(msg.error, errors.ResponseError);
assert.strictEqual(msg.error.code, types.responseErrorCodes.readFailure);
assert.strictEqual(msg.error.consistencies, types.consistencies.eachQuorum);
assert.strictEqual(msg.error.received, 3);
assert.strictEqual(msg.error.blockFor, 5);
assert.strictEqual(msg.error.failures, 2);
assert.strictEqual(msg.error.isDataPresent, 1);
assert.strictEqual(msg.error.message, 'Server failure during read query at consistency EACH_QUORUM (5 responses were required but only 3 replicas responded, 2 failed)');
done();
});
// Read Timeout at TWO with 2 received 2 block for and no data present
const bodyArray = [];
// Read Failure (0x1300)
bodyArray.push(utils.allocBufferFromArray([0, 0, 0x13, 0]));
// No Message
bodyArray.push(utils.allocBufferFromArray([0, 0]));
// EACH_QUORUM, 3 received, 5 block for, 2 failures, data present
bodyArray.push(utils.allocBufferFromArray([0, 7, 0, 0, 0, 3, 0, 0, 0, 5, 0, 0, 0, 2, 1]));
const body = Buffer.concat(bodyArray);
const header = getFrameHeader(body.length, types.opcodes.error, 4);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read a SIMPLE WRITE_TIMEOUT', function (done) {
const parser = buildParserAndExpect(function (msg) {
assert.ok(msg.error);
helper.assertInstanceOf(msg.error, errors.ResponseError);
assert.strictEqual(msg.error.code, types.responseErrorCodes.writeTimeout);
assert.strictEqual(msg.error.consistencies, types.consistencies.quorum);
assert.strictEqual(msg.error.received, 1);
assert.strictEqual(msg.error.blockFor, 3);
assert.strictEqual(msg.error.writeType, 'SIMPLE');
assert.strictEqual(msg.error.message, 'Server timeout during write query at consistency QUORUM (1 peer(s) acknowledged the write over 3 required)');
done();
});
// write timeout at consistency quorum with 1 of 3 replicas responding.
const bodyArray = [];
// Write Timeout (0x1100)
bodyArray.push(utils.allocBufferFromArray([0, 0, 0x11, 0]));
// No Message
bodyArray.push(utils.allocBufferFromArray([0, 0]));
// Quorum, with 1 received and 3 block for
bodyArray.push(utils.allocBufferFromArray([0, 4, 0, 0, 0, 1, 0, 0, 0, 3]));
// Write Type 'SIMPLE'
bodyArray.push(utils.allocBufferFromArray([0, 'SIMPLE'.length]));
bodyArray.push(utils.allocBufferFromString('SIMPLE'));
const body = Buffer.concat(bodyArray);
const header = getFrameHeader(body.length, types.opcodes.error, 4);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read a BATCH_LOG WRITE_TIMEOUT', function (done) {
const parser = buildParserAndExpect(function (msg) {
assert.ok(msg.error);
helper.assertInstanceOf(msg.error, errors.ResponseError);
assert.strictEqual(msg.error.code, types.responseErrorCodes.writeTimeout);
assert.strictEqual(msg.error.consistencies, types.consistencies.one);
assert.strictEqual(msg.error.received, 0);
assert.strictEqual(msg.error.blockFor, 1);
assert.strictEqual(msg.error.writeType, 'BATCH_LOG');
assert.strictEqual(msg.error.message, 'Server timeout during batchlog write at consistency ONE (0 peer(s) acknowledged the write over 1 required)');
done();
});
// batchlog write timeout at consistency quorum with 0 of 1 replicas responding.
const bodyArray = [];
// Write Timeout (0x1100)
bodyArray.push(utils.allocBufferFromArray([0, 0, 0x11, 0]));
// No Message
bodyArray.push(utils.allocBufferFromArray([0, 0]));
// ONE, with 0 received and 1 block for
bodyArray.push(utils.allocBufferFromArray([0, 1, 0, 0, 0, 0, 0, 0, 0, 1]));
// Write Type 'BATCH_LOG'
bodyArray.push(utils.allocBufferFromArray([0, 'BATCH_LOG'.length]));
bodyArray.push(utils.allocBufferFromString('BATCH_LOG'));
const body = Buffer.concat(bodyArray);
const header = getFrameHeader(body.length, types.opcodes.error, 4);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read a WRITE_FAILURE', function (done) {
const parser = buildParserAndExpect(function (msg) {
assert.ok(msg.error);
helper.assertInstanceOf(msg.error, errors.ResponseError);
assert.strictEqual(msg.error.code, types.responseErrorCodes.writeFailure);
assert.strictEqual(msg.error.consistencies, types.consistencies.three);
assert.strictEqual(msg.error.received, 2);
assert.strictEqual(msg.error.blockFor, 3);
assert.strictEqual(msg.error.failures, 1);
assert.strictEqual(msg.error.writeType, 'COUNTER');
assert.strictEqual(msg.error.message, 'Server failure during write query at consistency THREE (3 responses were required but only 2 replicas responded, 1 failed)');
done();
});
// batchlog write timeout at consistency quorum with 0 of 1 replicas responding.
const bodyArray = [];
// Write Timeout (0x1500)
bodyArray.push(utils.allocBufferFromArray([0, 0, 0x15, 0]));
// No Message
bodyArray.push(utils.allocBufferFromArray([0, 0]));
// THREE, with 2 received, 3 block for, 1 failures
bodyArray.push(utils.allocBufferFromArray([0, 3, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 1]));
// Write Type 'COUNTER'
bodyArray.push(utils.allocBufferFromArray([0, 'COUNTER'.length]));
bodyArray.push(utils.allocBufferFromString('COUNTER'));
const body = Buffer.concat(bodyArray);
const header = getFrameHeader(body.length, types.opcodes.error, 4);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read an UNPREPARED', function (done) {
const message = 'No query prepared with ID 0x8675';
const id = utils.allocBufferFromArray([0x86, 0x75]);
const parser = buildParserAndExpect(function (msg) {
assert.ok(msg.error);
helper.assertInstanceOf(msg.error, errors.ResponseError);
assert.strictEqual(msg.error.code, types.responseErrorCodes.unprepared);
assert.deepEqual(msg.error.queryId, id);
assert.strictEqual(msg.error.message, message);
done();
});
// Unprepared with ID 0x8675
const bodyArray = [];
// Unprepared (0x2500)
bodyArray.push(utils.allocBufferFromArray([0, 0, 0x25, 0]));
// ID 0x8675 was Not Prepared
bodyArray.push(utils.allocBufferFromArray([0, message.length]));
bodyArray.push(utils.allocBufferFromString(message));
// 0x8675
bodyArray.push(utils.allocBufferFromArray([0, 2]));
bodyArray.push(id);
const body = Buffer.concat(bodyArray);
const header = getFrameHeader(body.length, types.opcodes.error, 4);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read a FUNCTION_FAILURE', function (done) {
const message = "Could not execute function";
const keyspace = 'myks';
const functionName = 'foo';
const argTypes = ['int', 'varchar', 'blob'];
const parser = buildParserAndExpect(function (msg) {
assert.ok(msg.error);
helper.assertInstanceOf(msg.error, errors.ResponseError);
assert.strictEqual(msg.error.code, types.responseErrorCodes.functionFailure);
assert.strictEqual(msg.error.keyspace, keyspace);
assert.strictEqual(msg.error.functionName, functionName);
assert.deepEqual(msg.error.argTypes, argTypes);
assert.strictEqual(msg.error.message, message);
done();
});
// Unprepared with ID 0x8675
const bodyArray = [];
// Function Failure 0x1400)
bodyArray.push(utils.allocBufferFromArray([0, 0, 0x14, 0]));
// Error Message
bodyArray.push(utils.allocBufferFromArray([0, message.length]));
bodyArray.push(utils.allocBufferFromString(message));
// Keyspace
bodyArray.push(utils.allocBufferFromArray([0, keyspace.length]));
bodyArray.push(utils.allocBufferFromString(keyspace));
// Function Name
bodyArray.push(utils.allocBufferFromArray([0, functionName.length]));
bodyArray.push(utils.allocBufferFromString(functionName));
// Arguments
bodyArray.push(utils.allocBufferFromArray([0, argTypes.length]));
argTypes.forEach(function (arg) {
bodyArray.push(utils.allocBufferFromArray([0, arg.length]));
bodyArray.push(utils.allocBufferFromString(arg));
});
const body = Buffer.concat(bodyArray);
const header = getFrameHeader(body.length, types.opcodes.error, 4);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read an ALREADY_EXISTS for Table', function (done) {
const message = 'Table already exists!';
const keyspace = 'myks';
const table = 'tbl';
const parser = buildParserAndExpect(function (msg) {
assert.ok(msg.error);
helper.assertInstanceOf(msg.error, errors.ResponseError);
assert.strictEqual(msg.error.code, types.responseErrorCodes.alreadyExists);
assert.strictEqual(msg.error.keyspace, keyspace);
assert.strictEqual(msg.error.table, table);
assert.strictEqual(msg.error.message, message);
done();
});
// Already Exists for Table
const bodyArray = [];
// Already Exists 0x2400)
bodyArray.push(utils.allocBufferFromArray([0, 0, 0x24, 0]));
// Error Message
bodyArray.push(utils.allocBufferFromArray([0, message.length]));
bodyArray.push(utils.allocBufferFromString(message));
// Keyspace
bodyArray.push(utils.allocBufferFromArray([0, keyspace.length]));
bodyArray.push(utils.allocBufferFromString(keyspace));
// Table
bodyArray.push(utils.allocBufferFromArray([0, table.length]));
bodyArray.push(utils.allocBufferFromString(table));
const body = Buffer.concat(bodyArray);
const header = getFrameHeader(body.length, types.opcodes.error, 4);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read an ALREADY_EXISTS for Keyspace', function (done) {
const message = 'Keyspace already exists!';
const keyspace = 'myks';
const parser = buildParserAndExpect(function (msg) {
assert.ok(msg.error);
helper.assertInstanceOf(msg.error, errors.ResponseError);
assert.strictEqual(msg.error.code, types.responseErrorCodes.alreadyExists);
assert.strictEqual(msg.error.keyspace, keyspace);
assert.ifError(msg.error.table); // table should not be present.
assert.strictEqual(msg.error.message, message);
done();
});
// Already Exists for Keyspace
const bodyArray = [];
// Already Exists 0x2400)
bodyArray.push(utils.allocBufferFromArray([0, 0, 0x24, 0]));
// Error Message
bodyArray.push(utils.allocBufferFromArray([0, message.length]));
bodyArray.push(utils.allocBufferFromString(message));
// Keyspace
bodyArray.push(utils.allocBufferFromArray([0, keyspace.length]));
bodyArray.push(utils.allocBufferFromString(keyspace));
// Table (empty string)
bodyArray.push(utils.allocBufferFromArray([0, 0]));
const body = Buffer.concat(bodyArray);
const header = getFrameHeader(body.length, types.opcodes.error, 4);
parser._transform({
header: header,
chunk: body,
offset: 0
}, null, doneIfError(done));
});
it('should read a buffer until there is enough data', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.bodyLength, 4);
assert.strictEqual(item.header.opcode, types.opcodes.result);
done();
});
parser._transform({
header: getFrameHeader(4, types.opcodes.result),
chunk: utils.allocBufferFromArray([ 0 ]),
offset: 0
}, null, doneIfError(done));
parser._transform({
header: getFrameHeader(4, types.opcodes.result),
chunk: utils.allocBufferFromArray([ 0, 0, types.resultKind.voidResult ]),
offset: 0
}, null, doneIfError(done));
});
it('should emit empty result one column no rows', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.result);
assert.ok(item.result && item.result.rows && item.result.rows.length === 0);
done();
});
//kind
parser._transform(getBodyChunks(1, 0, 0, 4), null, doneIfError(done));
//metadata
parser._transform(getBodyChunks(1, 0, 4, 12), null, doneIfError(done));
//column names and rows
parser._transform(getBodyChunks(1, 0, 12, null), null, doneIfError(done));
});
it('should emit empty result two columns no rows', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.result);
assert.ok(item.result && item.result.rows && item.result.rows.length === 0);
done();
});
//2 columns, no rows, in one chunk
parser._transform(getBodyChunks(2, 0, 0, null), null, doneIfError(done));
});
it('should emit row when rows present', function (done) {
const parser = newInstance();
const rowLength = 2;
let rowCounter = 0;
parser.on('readable', function () {
let item;
while ((item = parser.read())) {
if (!item.row && item.frameEnded) {
continue;
}
assert.strictEqual(item.header.opcode, types.opcodes.result);
assert.ok(item.row);
if ((++rowCounter) === rowLength) {
done();
}
}
});
parser.setOptions(33, { byRow: true });
//3 columns, 2 rows
parser._transform(getBodyChunks(3, rowLength, 0, 10), null, doneIfError(done));
parser._transform(getBodyChunks(3, rowLength, 10, 32), null, doneIfError(done));
parser._transform(getBodyChunks(3, rowLength, 32, 37), null, doneIfError(done));
parser._transform(getBodyChunks(3, rowLength, 37, null), null, doneIfError(done));
});
it('should parse new_metadata_id in ROWS result', function (done) {
// Note: Given that the driver does not currently use skipMetadata, this should not be encountered
// in practice until NODEJS-433 is implemented.
const version = types.protocolVersion.dseV2;
const parser = newInstance(version);
const newId = types.Uuid.random();
parser.on('readable', function () {
const item = parser.read();
assert.ifError(item.error);
assert.strictEqual(item.header.opcode, types.opcodes.result);
helper.assertInstanceOf(item.result.meta.newResultId, Buffer);
assert.strictEqual(item.result.meta.newResultId.toString('hex'), newId.getBuffer().toString('hex'));
done();
});
const body = Buffer.concat([
utils.allocBufferFromArray([0, 0, 0, types.resultKind.rows]),
utils.allocBufferFromArray([0, 0, 0, 9]), // flags = metadata changed, global table spec
utils.allocBufferFromArray([0, 0, 0, 1]), // column count = 1
utils.allocBufferFromArray([0, 16]), // id length
newId.getBuffer(),
utils.allocBufferFromArray([0, 1, 0x62, 0, 1, 0x63, 0, 1, 0x61, 0, types.dataTypes.text]), // keyspace, table, column (of type text)
utils.allocBufferFromArray([0, 0, 0, 0]) // 0 rows.
]);
parser._transform({
header: getFrameHeader(body.length, types.opcodes.result, version),
chunk: body,
offset: 0
}, null, doneIfError(done));
});
describe('with multiple chunk lengths', function () {
const parser = newInstance();
let result;
let byRowCompleted;
parser.on('readable', function () {
let item;
while ((item = parser.read())) {
if (!item.row && item.frameEnded) {
continue;
}
byRowCompleted = item.byRowCompleted;
if (byRowCompleted) {
continue;
}
assert.strictEqual(item.header.opcode, types.opcodes.result);
assert.ok(item.row);
result[item.header.streamId] = result[item.header.streamId] || [];
result[item.header.streamId].push(item.row);
}
});
[1, 3, 5, 13].forEach(function (chunkLength) {
it('should emit rows chunked with chunk length of ' + chunkLength, function (done) {
result = {};
const expected = [
{ columnLength: 3, rowLength: 10 },
{ columnLength: 5, rowLength: 5 },
{ columnLength: 6, rowLength: 15 },
{ columnLength: 6, rowLength: 5 },
{ columnLength: 1, rowLength: 20 }
];
const items = expected.map(function (item, index) {
parser.setOptions(index, { byRow: true });
return getBodyChunks(item.columnLength, item.rowLength, 0, null, null, index);
});
function transformChunkedItem(i) {
const item = items[i];
const chunkedItem = {
header: item.header,
offset: 0
};
for (let j = 0; j < item.chunk.length; j = j + chunkLength) {
let end = j + chunkLength;
if (end >= item.chunk.length) {
end = item.chunk.length;
chunkedItem.frameEnded = true;
}
const start = j;
if (start === 0) {
//sum a few bytes
chunkedItem.chunk = Buffer.concat([ utils.allocBufferUnsafe(9), item.chunk.slice(start, end) ]);
chunkedItem.offset = 9;
}
else {
chunkedItem.chunk = item.chunk.slice(start, end);
chunkedItem.offset = 0;
}
parser._transform(chunkedItem, null, helper.throwop);
}
}
for (let i = 0; i < items.length; i++) {
transformChunkedItem(i);
}
process.nextTick(() => {
//assert result
expected.forEach(function (expectedItem, index) {
assert.ok(result[index], 'Result not found for index ' + index);
assert.strictEqual(result[index].length, expectedItem.rowLength);
});
done();
});
});
});
});
describe('with multiple chunk lengths piped', function () {
const expected = [
{ columnLength: 3, rowLength: 10 },
{ columnLength: 5, rowLength: 5 },
{ columnLength: 6, rowLength: 15 },
{ columnLength: 6, rowLength: 15 },
{ columnLength: 1, rowLength: 20 }
];
class TestEmitter extends events.EventEmitter {}
const emitter = new TestEmitter();
const doneParsing = new Promise((resolve) => {
let cnt = 0;
emitter.on('parseDone', () => {
cnt += 1;
if (cnt === expected.length) {
resolve();
}
});
});
const protocol = new streams.Protocol({ objectMode: true });
const parser = newInstance();
protocol.pipe(parser);
let result;
let byRowCompleted;
parser.on('readable', function () {
let item;
while ((item = parser.read())) {
if (!item.row && item.frameEnded) {
continue;
}
byRowCompleted = item.byRowCompleted;
if (byRowCompleted) {
emitter.emit('parseDone');
continue;
}
assert.strictEqual(item.header.opcode, types.opcodes.result);
assert.ok(item.row);
result[item.header.streamId] = result[item.header.streamId] || [];
result[item.header.streamId].push(item.row);
}
});
[1, 2, 7, 11].forEach(function (chunkLength) {
it('should emit rows chunked with chunk length of ' + chunkLength, function (done) {
result = {};
const buffer = Buffer.concat(expected.map(function (expectedItem, index) {
parser.setOptions(index, { byRow: true });
const item = getBodyChunks(expectedItem.columnLength, expectedItem.rowLength, 0, null, null, index);
return Buffer.concat([ item.header.toBuffer(), item.chunk ]);
}));
for (let j = 0; j < buffer.length; j = j + chunkLength) {
let end = j + chunkLength;
if (end >= buffer.length) {
end = buffer.length;
}
protocol._transform(buffer.slice(j, end), null, helper.throwop);
}
doneParsing.then(() => {
process.nextTick(() => {
assert.ok(byRowCompleted);
//assert result
expected.forEach(function (expectedItem, index) {
assert.ok(result[index], 'Result not found for index ' + index);
assert.strictEqual(result[index].length, expectedItem.rowLength);
assert.strictEqual(result[index][0].keys().length, expectedItem.columnLength);
});
done();
});
});
});
});
});
it('should emit row with large row values', function (done) {
this.timeout(20000);
//3mb value
let cellValue = helper.fillArray(3 * 1024 * 1024, 74);
//Add the length 0x00300000 of the value
cellValue = [0, 30, 0, 0].concat(cellValue);
const rowLength = 1;
utils.series([function (next) {
const parser = newInstance();
let rowCounter = 0;
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.result);
assert.ok(item.row);
if ((++rowCounter) === rowLength) {
next();
}
});
//1 columns, 1 row, 1 chunk
parser._transform(getBodyChunks(1, rowLength, 0, null, cellValue), null, doneIfError(done));
}, function (next) {
const parser = newInstance();
let rowCounter = 0;
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.result);
assert.ok(item.row);
if ((++rowCounter) === rowLength) {
next();
}
});
//1 columns, 1 row, 2 chunks
parser._transform(getBodyChunks(1, rowLength, 0, 50, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(1, rowLength, 50, null, cellValue), null, doneIfError(done));
}, function (next) {
const parser = newInstance();
let rowCounter = 0;
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.result);
assert.ok(item.row);
if ((++rowCounter) === rowLength) {
next();
}
});
//1 columns, 1 row, 6 chunks
parser._transform(getBodyChunks(1, rowLength, 0, 50, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(1, rowLength, 50, 60, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(1, rowLength, 60, 120, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(1, rowLength, 120, 195, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(1, rowLength, 195, 1501, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(1, rowLength, 1501, null, cellValue), null, doneIfError(done));
}, function (next) {
let cellValue = helper.fillArray(256, 74);
//Add the length 256 of the value
cellValue = [0, 0, 1, 0].concat(cellValue);
const parser = newInstance();
let rowCounter = 0;
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.result);
assert.ok(item.row);
if ((++rowCounter) === rowLength) {
next();
}
});
//1 columns, 1 row, 6 small chunks
parser._transform(getBodyChunks(1, rowLength, 0, 50, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(1, rowLength, 50, 100, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(1, rowLength, 100, 150, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(1, rowLength, 150, 200, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(1, rowLength, 200, null, cellValue), null, doneIfError(done));
}, function (next) {
let cellValue = helper.fillArray(256, 74);
//Add the length 256 of the value
cellValue = [0, 0, 1, 0].concat(cellValue);
const parser = newInstance();
let rowCounter = 0;
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.result);
assert.ok(item.row);
if ((++rowCounter) === rowLength) {
next();
}
});
//2 columns, 1 row, small and large chunks
parser._transform(getBodyChunks(2, rowLength, 0, 19, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(2, rowLength, 19, 20, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(2, rowLength, 20, 24, cellValue), null, doneIfError(done));
parser._transform(getBodyChunks(2, rowLength, 24, null, cellValue), null, doneIfError(done));
}], done);
});
it('should read a AUTH_CHALLENGE response', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.authChallenge);
helper.assertValueEqual(item.token, utils.allocBufferFromArray([100, 100]));
assert.strictEqual(item.authChallenge, true);
done();
});
//Length + buffer
const bodyLength = 4 + 2;
parser._transform({
header: getFrameHeader(bodyLength, types.opcodes.authChallenge),
chunk: utils.allocBufferFromArray([255, 254, 0, 0, 0, 2]),
offset: 2
}, null, doneIfError(done));
parser._transform({
header: getFrameHeader(bodyLength, types.opcodes.authChallenge),
chunk: utils.allocBufferFromArray([100, 100]),
offset: 0
}, null, doneIfError(done));
});
it('should buffer ERROR response until complete', function (done) {
const parser = newInstance();
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.error);
helper.assertInstanceOf(item.error, errors.ResponseError);
assert.strictEqual(item.error.message, 'ERR');
//mocha will fail if done is called multiple times
assert.strictEqual(parser.read(), null);
done();
});
//streamId 33
const header = new types.FrameHeader(4, 0, 33, types.opcodes.error, 9);
parser.setOptions(33, { byRow: true });
assert.strictEqual(parser.frameState({ header: header}).byRow, true);
parser._transform({ header: header, chunk: utils.allocBufferFromArray([255, 0, 0, 0, 0]), offset: 1}, null, doneIfError(done));
parser._transform({ header: header, chunk: utils.allocBufferFromArray([0, 3]), offset: 0}, null, doneIfError(done));
parser._transform({ header: header, chunk: utils.allocBufferFromString('ERR'), offset: 0}, null, doneIfError(done));
});
it('should not buffer RESULT ROWS response when byRow is enabled', function (done) {
const parser = newInstance();
const rowLength = 2;
let rowCounter = 0;
let byRowCompleted = false;
parser.on('readable', function () {
const item = parser.read();
assert.strictEqual(item.header.opcode, types.opcodes.result);
byRowCompleted = item.byRowCompleted;
if (!item.byRowCompleted) {
assert.ok(item.row);
rowCounter++;
}
});
//12 is the stream id used by the header helper by default
parser.setOptions(12, { byRow: true });
//3 columns, 2 rows
parser._transform(getBodyChunks(3, rowLength, 0, 10), null, doneIfError(done));
parser._transform(getBodyChunks(3, rowLength, 10, 32), null, doneIfError(done));
parser._transform(getBodyChunks(3, rowLength, 32, 55), null, doneIfError(done));
process.nextTick(() => {
assert.strictEqual(rowCounter, 1);
assert.notEqual(byRowCompleted, true);
parser._transform(getBodyChunks(3, rowLength, 55, null), null, doneIfError(done));
process.nextTick(() => {
assert.strictEqual(rowCounter, 2);
done();
});
});
});
});
});
/**
* @param {Number} [protocolVersion]
* @returns {exports.Parser}
*/
function newInstance(protocolVersion) {
if (!protocolVersion) {
protocolVersion = 2;
}
return new streams.Parser({objectMode:true}, new Encoder(protocolVersion, {}));
}
/**
* Test Helper method to get a frame header with stream id 12
* @returns {exports.FrameHeader}
*/
function getFrameHeader(bodyLength, opcode, version, trace, streamId, warnings, customPayload) {
if (typeof streamId === 'undefined') {
streamId = 12;
}
let flags = 0;
flags += (trace ? 0x2 : 0x0);
flags += (customPayload ? 0x4 : 0x0);
flags += (warnings ? 0x8 : 0x0);
return new types.FrameHeader(version || 2, flags, streamId, opcode, bodyLength);
}
/**
* @returns {{header: FrameHeader, chunk: Buffer, offset: number}}
*/
function getBodyChunks(columnLength, rowLength, fromIndex, toIndex, cellValue, streamId) {
let i;
let fullChunk = [
//kind
0, 0, 0, types.resultKind.rows,
//flags and column count
0, 0, 0, 1, 0, 0, 0, columnLength,
//column names
0, 1, 97, //string 'a' as ksname
0, 1, 98 //string 'b' as tablename
];
for (i = 0; i < columnLength; i++) {
fullChunk = fullChunk.concat([
0, 1, 99 + i, //string name, starting by 'c' as column name
0, types.dataTypes.text //short datatype
]);
}
//rows length
fullChunk = fullChunk.concat([0, 0, 0, rowLength || 0]);
for (i = 0; i < rowLength; i++) {
let rowChunk = [];
for (let j = 0; j < columnLength; j++) {
//4 bytes length + bytes of each column value
if (!cellValue) {
rowChunk.push(0);
rowChunk.push(0);
rowChunk.push(0);
rowChunk.push(1);
//value
rowChunk.push(j);
}
else {
rowChunk = rowChunk.concat(cellValue);
}
}
fullChunk = fullChunk.concat(rowChunk);
}
return {
header: getFrameHeader(fullChunk.length, types.opcodes.result, null, null, streamId),
chunk: utils.allocBufferFromArray(fullChunk.slice(fromIndex, toIndex || undefined)),
offset: 0
};
}
function getEventData(eventType, value) {
const bodyArray = [];
//EVENT TYPE
bodyArray.push(utils.allocBufferFromArray([0, eventType.length]));
bodyArray.push(utils.allocBufferFromString(eventType));
//STATUS CHANGE DESCRIPTION
bodyArray.push(utils.allocBufferFromArray([0, value.length]));
bodyArray.push(utils.allocBufferFromString(value));
//Address
bodyArray.push(utils.allocBufferFromArray([4, 127, 0, 0, 1]));
//Port
bodyArray.push(utils.allocBufferFromArray([0, 0, 0, 200]));
const body = Buffer.concat(bodyArray);
const header = new types.FrameHeader(2, 0, -1, types.opcodes.event, body.length);
return {header: header, chunk: body};
}
function buildParserAndExpect(validationFn) {
const parser = newInstance();
parser.on('readable', () => validationFn(parser.read()));
return parser;
}
/**
* Calls done in case there is an error
*/
function doneIfError(done) {
return function doneIfErrorCallback(err) {
if (err) {
done(err);
}
};
}