| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| 'use strict'; |
| |
| const util = require('util'); |
| const { Transform, Writable } = require('stream'); |
| |
| const types = require('./types'); |
| const utils = require('./utils'); |
| const errors = require('./errors'); |
| const { FrameHeader } = types; |
| const { FrameReader } = require('./readers'); |
| |
| /** |
| * Transforms chunks, emits data objects {header, chunk} |
| * @param options Stream options |
| * @extends Transform |
| */ |
| function Protocol (options) { |
| Transform.call(this, options); |
| this.header = null; |
| this.bodyLength = 0; |
| this.clearHeaderChunks(); |
| this.version = 0; |
| this.headerSize = 0; |
| } |
| |
| util.inherits(Protocol, Transform); |
| |
| Protocol.prototype._transform = function (chunk, encoding, callback) { |
| let error = null; |
| try { |
| this.readItems(chunk); |
| } |
| catch (err) { |
| error = err; |
| } |
| callback(error); |
| }; |
| |
| /** |
| * Parses the chunk into frames (header and body). |
| * Emits (push) complete frames or frames with incomplete bodies. Following chunks containing the rest of the body will |
| * be emitted using the same frame. |
| * It buffers incomplete headers. |
| * @param {Buffer} chunk |
| */ |
| Protocol.prototype.readItems = function (chunk) { |
| if (!chunk || chunk.length === 0) { |
| return; |
| } |
| if (this.version === 0) { |
| //The server replies the first message with the max protocol version supported |
| this.version = FrameHeader.getProtocolVersion(chunk); |
| this.headerSize = FrameHeader.size(this.version); |
| } |
| let offset = 0; |
| let currentHeader = this.header; |
| this.header = null; |
| if (this.headerChunks.byteLength !== 0) { |
| //incomplete header was buffered try to read the header from the buffered chunks |
| this.headerChunks.parts.push(chunk); |
| if (this.headerChunks.byteLength + chunk.length < this.headerSize) { |
| this.headerChunks.byteLength += chunk.length; |
| return; |
| } |
| currentHeader = FrameHeader.fromBuffer(Buffer.concat(this.headerChunks.parts, this.headerSize)); |
| offset = this.headerSize - this.headerChunks.byteLength; |
| this.clearHeaderChunks(); |
| } |
| const items = []; |
| while (true) { |
| if (!currentHeader) { |
| if (this.headerSize > chunk.length - offset) { |
| if (chunk.length - offset <= 0) { |
| break; |
| } |
| //the header is incomplete, buffer it until the next chunk |
| const headerPart = chunk.slice(offset, chunk.length); |
| this.headerChunks.parts.push(headerPart); |
| this.headerChunks.byteLength = headerPart.length; |
| break; |
| } |
| //read header |
| currentHeader = FrameHeader.fromBuffer(chunk, offset); |
| offset += this.headerSize; |
| } |
| //parse body |
| const remaining = chunk.length - offset; |
| if (currentHeader.bodyLength <= remaining + this.bodyLength) { |
| items.push({ header: currentHeader, chunk: chunk, offset: offset, frameEnded: true }); |
| offset += currentHeader.bodyLength - this.bodyLength; |
| //reset the body length |
| this.bodyLength = 0; |
| } |
| else if (remaining >= 0) { |
| //the body is not fully contained in this chunk |
| //will continue later |
| this.header = currentHeader; |
| this.bodyLength += remaining; |
| if (remaining > 0) { |
| //emit if there is at least a byte to emit |
| items.push({ header: currentHeader, chunk: chunk, offset: offset, frameEnded: false }); |
| } |
| break; |
| } |
| currentHeader = null; |
| } |
| for (let i = 0; i < items.length; i++) { |
| this.push(items[i]); |
| } |
| }; |
| |
| Protocol.prototype.clearHeaderChunks = function () { |
| this.headerChunks = { byteLength: 0, parts: [] }; |
| }; |
| |
| /** |
| * A stream that gets reads header + body chunks and transforms them into header + (row | error) |
| * @param {Object} streamOptions Node.js Stream options |
| * @param {Encoder} encoder Encoder instance for the parser to use |
| * @extends Transform |
| */ |
| function Parser (streamOptions, encoder) { |
| Transform.call(this, streamOptions); |
| //frames that are streaming, indexed by id |
| this.frames = {}; |
| this.encoder = encoder; |
| } |
| |
| util.inherits(Parser, Transform); |
| |
| Parser.prototype._transform = function (item, encoding, callback) { |
| const frameInfo = this.frameState(item); |
| |
| let error = null; |
| try { |
| this.parseBody(frameInfo, item); |
| } |
| catch (err) { |
| error = err; |
| } |
| callback(error); |
| |
| if (item.frameEnded) { |
| if (frameInfo.cellBuffer) { |
| //Frame was being streamed but an error force it to buffer the result |
| this.push({ |
| header: frameInfo.header, |
| error: new errors.DriverInternalError('There was an problem while parsing streaming frame, opcode ' + |
| frameInfo.header.opcode) |
| }); |
| } |
| //all the parsing finished and it was streamed down |
| //emit an item that signals it |
| this.push({ header: frameInfo.header, frameEnded: true}); |
| } |
| }; |
| |
| /** |
| * @param frameInfo |
| * @param {{header: FrameHeader, chunk: Buffer, offset: Number}} item |
| */ |
| Parser.prototype.parseBody = function (frameInfo, item) { |
| frameInfo.isStreaming = frameInfo.byRow && item.header.opcode === types.opcodes.result; |
| if (!this.handleFrameBuffers(frameInfo, item)) { |
| // Frame isn't complete and we are not streaming the frame |
| return; |
| } |
| const reader = new FrameReader(item.header, item.chunk, item.offset); |
| // Check that flags have not been parsed yet for this frame |
| if (frameInfo.flagsInfo === undefined) { |
| const originalOffset = reader.offset; |
| try { |
| frameInfo.flagsInfo = reader.readFlagsInfo(); |
| } |
| catch (e) { |
| return this.handleParsingError(e, frameInfo, reader, originalOffset); |
| } |
| } |
| |
| //All the body for most operations is already buffered at this stage |
| //Except for RESULT |
| switch (item.header.opcode) { |
| case types.opcodes.result: |
| return this.parseResult(frameInfo, reader); |
| case types.opcodes.ready: |
| case types.opcodes.authSuccess: |
| return this.push({ header: frameInfo.header, ready: true }); |
| case types.opcodes.authChallenge: |
| return this.push({ header: frameInfo.header, authChallenge: true, token: reader.readBytes()}); |
| case types.opcodes.authenticate: |
| return this.push({ header: frameInfo.header, mustAuthenticate: true, authenticatorName: reader.readString()}); |
| case types.opcodes.error: |
| return this.push({ header: frameInfo.header, error: reader.readError()}); |
| case types.opcodes.supported: |
| return this.push({ header: frameInfo.header, supported: reader.readStringMultiMap()}); |
| case types.opcodes.event: |
| return this.push({ header: frameInfo.header, event: reader.readEvent()}); |
| default: |
| return this.push({ header: frameInfo.header, error: new Error('Received invalid opcode: ' + item.header.opcode) }); |
| } |
| }; |
| |
| /** |
| * Buffers if needed and returns true if it has all the necessary data to continue parsing the frame. |
| * @param frameInfo |
| * @param {{header: FrameHeader, chunk: Buffer, offset: Number}} item |
| * @returns {Boolean} |
| */ |
| Parser.prototype.handleFrameBuffers = function (frameInfo, item) { |
| if (!frameInfo.isStreaming) { |
| // Handle buffering for complete frame bodies |
| const currentLength = (frameInfo.bufferLength || 0) + item.chunk.length - item.offset; |
| if (currentLength < item.header.bodyLength) { |
| //buffer until the frame is completed |
| this.addFrameBuffer(frameInfo, item); |
| return false; |
| } |
| //We have received the full frame body |
| if (frameInfo.buffers) { |
| item.chunk = this.getFrameBuffer(frameInfo, item); |
| item.offset = 0; |
| } |
| return true; |
| } |
| if (frameInfo.cellBuffer) { |
| // Handle buffering for frame cells (row cells or metadata cells) |
| if (item.offset !== 0) { |
| throw new errors.DriverInternalError('Following chunks can not have an offset greater than zero'); |
| } |
| frameInfo.cellBuffer.parts.push(item.chunk); |
| if (!frameInfo.cellBuffer.expectedLength) { |
| //Its a buffer outside a row cell (metadata or other) |
| if (frameInfo.cellBuffer.parts.length !== 2) { |
| throw new errors.DriverInternalError('Buffer for streaming frame can not contain more than 1 item'); |
| } |
| item.chunk = Buffer.concat(frameInfo.cellBuffer.parts, frameInfo.cellBuffer.byteLength + item.chunk.length); |
| frameInfo.cellBuffer = null; |
| return true; |
| } |
| if (frameInfo.cellBuffer.expectedLength > frameInfo.cellBuffer.byteLength + item.chunk.length) { |
| //We still haven't got the cell data |
| frameInfo.cellBuffer.byteLength += item.chunk.length; |
| return false; |
| } |
| item.chunk = Buffer.concat(frameInfo.cellBuffer.parts, frameInfo.cellBuffer.byteLength + item.chunk.length); |
| frameInfo.cellBuffer = null; |
| } |
| return true; |
| }; |
| |
| /** |
| * Adds this chunk to the frame buffers. |
| * @param frameInfo |
| * @param {{header: FrameHeader, chunk: Buffer, offset: Number}} item |
| */ |
| Parser.prototype.addFrameBuffer = function (frameInfo, item) { |
| if (!frameInfo.buffers) { |
| frameInfo.buffers = [ item.chunk.slice(item.offset) ]; |
| frameInfo.bufferLength = item.chunk.length - item.offset; |
| return; |
| } |
| if (item.offset > 0) { |
| throw new errors.DriverInternalError('Following chunks can not have an offset greater than zero'); |
| } |
| frameInfo.buffers.push(item.chunk); |
| frameInfo.bufferLength += item.chunk.length; |
| }; |
| |
| /** |
| * Adds the last chunk and concatenates the frame buffers |
| * @param frameInfo |
| * @param {{header: FrameHeader, chunk: Buffer, offset: Number}} item |
| */ |
| Parser.prototype.getFrameBuffer = function (frameInfo, item) { |
| frameInfo.buffers.push(item.chunk); |
| const result = Buffer.concat(frameInfo.buffers, frameInfo.bodyLength); |
| frameInfo.buffers = null; |
| return result; |
| }; |
| |
| /** |
| * Tries to read the result in the body of a message |
| * @param frameInfo Frame information, header / metadata |
| * @param {FrameReader} reader |
| */ |
| Parser.prototype.parseResult = function (frameInfo, reader) { |
| let result; |
| // As we might be streaming and the frame buffer might not be complete, |
| // read the metadata and different types of result values in a try-catch. |
| // Store the reader position |
| const originalOffset = reader.offset; |
| try { |
| if (!frameInfo.meta) { |
| frameInfo.kind = reader.readInt(); |
| // Spec 4.2.5 |
| switch (frameInfo.kind) { |
| case types.resultKind.voidResult: |
| result = { header: frameInfo.header, flags: frameInfo.flagsInfo }; |
| break; |
| case types.resultKind.rows: |
| // Parse the rows metadata, the rest of the response is going to be parsed afterwards |
| frameInfo.meta = reader.readMetadata(frameInfo.kind); |
| break; |
| case types.resultKind.setKeyspace: |
| result = { header: frameInfo.header, keyspaceSet: reader.readString(), flags: frameInfo.flagsInfo }; |
| break; |
| case types.resultKind.prepared: |
| { |
| const preparedId = utils.copyBuffer(reader.readShortBytes()); |
| frameInfo.meta = reader.readMetadata(frameInfo.kind); |
| result = { header: frameInfo.header, id: preparedId, meta: frameInfo.meta, flags: frameInfo.flagsInfo }; |
| break; |
| } |
| case types.resultKind.schemaChange: |
| result = { header: frameInfo.header, schemaChange: reader.parseSchemaChange(), flags: frameInfo.flagsInfo }; |
| break; |
| default: |
| throw errors.DriverInternalError('Unexpected result kind: ' + frameInfo.kind); |
| } |
| } |
| } |
| catch (e) { |
| return this.handleParsingError(e, frameInfo, reader, originalOffset); |
| } |
| if (result) { |
| if (frameInfo.emitted) { |
| // It may contain additional metadata and info that it's not being parsed |
| return; |
| } |
| frameInfo.emitted = true; |
| return this.push(result); |
| } |
| if (reader.remainingLength() > 0) { |
| this.parseRows(frameInfo, reader); |
| } |
| }; |
| |
| /** |
| * @param frameInfo |
| * @param {FrameReader} reader |
| */ |
| Parser.prototype.parseRows = function (frameInfo, reader) { |
| if (frameInfo.parsingError) { |
| //No more processing on this frame |
| return; |
| } |
| if (frameInfo.rowLength === undefined) { |
| try { |
| frameInfo.rowLength = reader.readInt(); |
| } |
| catch (e) { |
| return this.handleParsingError(e, frameInfo, reader); |
| } |
| } |
| if (frameInfo.rowLength === 0) { |
| return this.push({ |
| header: frameInfo.header, |
| result: { rows: utils.emptyArray, meta: frameInfo.meta, flags: frameInfo.flagsInfo } |
| }); |
| } |
| const meta = frameInfo.meta; |
| frameInfo.rowIndex = frameInfo.rowIndex || 0; |
| for (let i = frameInfo.rowIndex; i < frameInfo.rowLength; i++) { |
| const rowOffset = reader.offset; |
| const row = new types.Row(meta.columns); |
| let cellBuffer; |
| for (let j = 0; j < meta.columns.length; j++ ) { |
| const c = meta.columns[j]; |
| try { |
| cellBuffer = reader.readBytes(); |
| } |
| catch (e) { |
| return this.handleParsingError(e, frameInfo, reader, rowOffset, i); |
| } |
| try { |
| row[c.name] = this.encoder.decode(cellBuffer, c.type); |
| } |
| catch (e) { |
| //Something went wrong while decoding, we are not going to be able to recover |
| return this.handleParsingError(e, frameInfo, null); |
| } |
| } |
| this.push({ |
| header: frameInfo.header, |
| row: row, |
| meta: frameInfo.meta, |
| byRow: frameInfo.byRow, |
| length: frameInfo.rowLength, |
| flags: frameInfo.flagsInfo |
| }); |
| } |
| if (frameInfo.byRow) { |
| // Use an event item to identify that all the streaming rows have finished processing |
| this.push({ |
| header: frameInfo.header, |
| byRowCompleted: true, |
| meta: frameInfo.meta, |
| length: frameInfo.rowLength, |
| flags: frameInfo.flagsInfo |
| }); |
| } |
| }; |
| |
| /** |
| * Sets parser options (ie: how to yield the results as they are parsed) |
| * @param {Number} id Id of the stream |
| * @param options |
| */ |
| Parser.prototype.setOptions = function (id, options) { |
| if (this.frames[id.toString()]) { |
| throw new types.DriverError('There was already state for this frame'); |
| } |
| this.frames[id.toString()] = options; |
| }; |
| |
| /** |
| * Manually clears the frame options. |
| * This class already clears the provided options when the frame ends, so it's usually not required to invoke this |
| * method. |
| * When manually setting the options for continuous paging, it's possible that the frame options are set while |
| * it's being cancelled. |
| * @param {Number} id The streamId |
| */ |
| Parser.prototype.clearOptions = function (id) { |
| delete this.frames[id.toString()]; |
| }; |
| |
| /** |
| * Gets the frame info from the internal state. |
| * In case it is not there, it creates it. |
| * In case the frame ended |
| */ |
| Parser.prototype.frameState = function (item) { |
| let frameInfo = this.frames[item.header.streamId]; |
| if (!frameInfo) { |
| frameInfo = {}; |
| if (!item.frameEnded) { |
| //store it in the frames |
| this.frames[item.header.streamId] = frameInfo; |
| } |
| } |
| else if (item.frameEnded) { |
| //if it was already stored, remove it |
| delete this.frames[item.header.streamId]; |
| } |
| frameInfo.header = item.header; |
| return frameInfo; |
| }; |
| |
| /** |
| * Handles parsing error: pushing an error if its unexpected or buffer the cell if its streaming |
| * @param {Error} e |
| * @param frameInfo |
| * @param {FrameReader} reader |
| * @param {Number} [originalOffset] |
| * @param {Number} [rowIndex] |
| */ |
| Parser.prototype.handleParsingError = function (e, frameInfo, reader, originalOffset, rowIndex) { |
| if (reader && frameInfo.isStreaming && (e instanceof RangeError)) { |
| //A controlled error, buffer from offset and move on |
| return this.bufferResultCell(frameInfo, reader, originalOffset, rowIndex, e.expectedLength); |
| } |
| frameInfo.parsingError = true; |
| frameInfo.cellBuffer = null; |
| this.push({ header: frameInfo.header, error: e }); |
| }; |
| |
| /** |
| * When streaming, it buffers data since originalOffset. |
| * @param frameInfo |
| * @param {FrameReader} reader |
| * @param {Number} [originalOffset] |
| * @param {Number} [rowIndex] |
| * @param {Number} [expectedLength] |
| */ |
| Parser.prototype.bufferResultCell = function (frameInfo, reader, originalOffset, rowIndex, expectedLength) { |
| if (!originalOffset && originalOffset !== 0) { |
| originalOffset = reader.offset; |
| } |
| frameInfo.rowIndex = rowIndex; |
| const buffer = reader.slice(originalOffset); |
| frameInfo.cellBuffer = { |
| parts: [ buffer ], |
| byteLength: buffer.length, |
| expectedLength: expectedLength |
| }; |
| }; |
| |
| /** |
| * Represents a writable streams that emits results |
| */ |
| function ResultEmitter(options) { |
| Writable.call(this, options); |
| /** |
| * Stores the rows for frames that needs to be yielded as one result with many rows |
| */ |
| this.rowBuffer = {}; |
| } |
| |
| util.inherits(ResultEmitter, Writable); |
| |
| ResultEmitter.prototype._write = function (item, encoding, callback) { |
| let error = null; |
| try { |
| this.each(item); |
| } |
| catch (err) { |
| error = err; |
| } |
| callback(error); |
| }; |
| |
| |
| /** |
| * Analyzes the item and emit the corresponding event |
| */ |
| ResultEmitter.prototype.each = function (item) { |
| if (item.error || item.result) { |
| //Its either an error or an empty array rows |
| //no transformation needs to be made |
| return this.emit('result', item.header, item.error, item.result); |
| } |
| if (item.frameEnded) { |
| return this.emit('frameEnded', item.header); |
| } |
| if (item.lastContinuousPage) { |
| return this.emit('lastContinuousPage', item.header); |
| } |
| if (item.byRowCompleted) { |
| return this.emit('byRowCompleted', item.header, item.row, item.meta, item.flags); |
| } |
| if (item.byRow) { |
| //it should be yielded by row |
| return this.emit('row', item.header, item.row, item.meta, item.length, item.flags); |
| } |
| if (item.row) { |
| //it should be yielded as a result |
| //it needs to be buffered to an array of rows |
| return this.bufferAndEmit(item); |
| } |
| if (item.event) { |
| //its an event from Cassandra |
| return this.emit('nodeEvent', item.header, item.event); |
| } |
| //its a raw response (object with flags) |
| return this.emit('result', item.header, null, item); |
| }; |
| |
| /** |
| * Buffers the rows until the result set is completed and emits the result event. |
| */ |
| ResultEmitter.prototype.bufferAndEmit = function (item) { |
| let rows = this.rowBuffer[item.header.streamId]; |
| if (!rows) { |
| rows = this.rowBuffer[item.header.streamId] = []; |
| } |
| rows.push(item.row); |
| if (rows.length === item.length) { |
| this.emit('result', item.header, null, { rows: rows, meta: item.meta, flags: item.flags}); |
| delete this.rowBuffer[item.header.streamId]; |
| } |
| }; |
| |
| exports.Protocol = Protocol; |
| exports.Parser = Parser; |
| exports.ResultEmitter = ResultEmitter; |