| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /** |
| * @author Jorge Bay Gondra |
| */ |
| |
| import { Buffer } from 'buffer'; |
| import { EventEmitter } from 'eventemitter3'; |
| import type { Agent } from 'node:http'; |
| import Stream from 'readable-stream'; |
| import type { |
| CloseEvent as NodeWebSocketCloseEvent, |
| ErrorEvent as NodeWebSocketErrorEvent, |
| MessageEvent as NodeWebSocketMessageEvent, |
| WebSocket as NodeWebSocket, |
| Event as NodeWebSocketEvent, |
| } from 'ws'; |
| import ioc from '../structure/io/binary/GraphBinary.js'; |
| import * as serializer from '../structure/io/graph-serializer.js'; |
| import * as utils from '../utils.js'; |
| import Authenticator from './auth/authenticator.js'; |
| import ResponseError from './response-error.js'; |
| import ResultSet from './result-set.js'; |
| |
| const { DeferredPromise } = utils; |
| const { graphBinaryReader, graphBinaryWriter } = ioc; |
| |
| const responseStatusCode = { |
| success: 200, |
| noContent: 204, |
| partialContent: 206, |
| authenticationChallenge: 407, |
| }; |
| |
| const defaultMimeType = 'application/vnd.gremlin-v3.0+json'; |
| const graphSON2MimeType = 'application/vnd.gremlin-v2.0+json'; |
| const graphBinaryMimeType = 'application/vnd.graphbinary-v1.0'; |
| |
| type MimeType = typeof defaultMimeType | typeof graphSON2MimeType | typeof graphBinaryMimeType; |
| |
| const uuidPattern = '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'; |
| |
| export type ConnectionOptions = { |
| ca?: string[]; |
| cert?: string | string[] | Buffer; |
| mimeType?: MimeType; |
| pfx?: string | Buffer; |
| reader?: any; |
| rejectUnauthorized?: boolean; |
| traversalSource?: string; |
| writer?: any; |
| authenticator?: Authenticator; |
| headers?: Record<string, string | string[]>; |
| enableUserAgentOnConnect?: boolean; |
| agent?: Agent; |
| }; |
| |
| /** |
| * Represents a single connection to a Gremlin Server. |
| */ |
| export default class Connection extends EventEmitter { |
| private _ws: WebSocket | NodeWebSocket | undefined; |
| |
| readonly mimeType: MimeType; |
| |
| private readonly _responseHandlers: Record<string, { callback: (...args: any[]) => unknown; result: any }> = {}; |
| private readonly _reader: any; |
| private readonly _writer: any; |
| private _openPromise: ReturnType<typeof DeferredPromise<void>> | null; |
| private _openCallback: (() => unknown) | null; |
| private _closePromise: Promise<void> | null; |
| private _closeCallback: (() => unknown) | null; |
| |
| private readonly _header: string; |
| private readonly _header_buf: Buffer; |
| |
| isOpen = false; |
| traversalSource: string; |
| |
| private readonly _authenticator: any; |
| private readonly _enableUserAgentOnConnect: boolean; |
| |
| /** |
| * Creates a new instance of {@link Connection}. |
| * @param {String} url The resource uri. |
| * @param {Object} [options] The connection options. |
| * @param {Array} [options.ca] Trusted certificates. |
| * @param {String|Array|Buffer} [options.cert] The certificate key. |
| * @param {String} [options.mimeType] The mime type to use. |
| * @param {String|Buffer} [options.pfx] The private key, certificate, and CA certs. |
| * @param {GraphSONReader} [options.reader] The reader to use. |
| * @param {Boolean} [options.rejectUnauthorized] Determines whether to verify or not the server certificate. |
| * @param {String} [options.traversalSource] The traversal source. Defaults to: 'g'. |
| * @param {GraphSONWriter} [options.writer] The writer to use. |
| * @param {Authenticator} [options.authenticator] The authentication handler to use. |
| * @param {Object} [options.headers] An associative array containing the additional header key/values for the initial request. |
| * @param {Boolean} [options.enableUserAgentOnConnect] Determines if a user agent will be sent during connection handshake. Defaults to: true |
| * @param {http.Agent} [options.agent] The http.Agent implementation to use. |
| * @constructor |
| */ |
| constructor( |
| readonly url: string, |
| readonly options: ConnectionOptions = {}, |
| ) { |
| super(); |
| |
| /** |
| * Gets the MIME type. |
| * @type {String} |
| */ |
| this.mimeType = options.mimeType || defaultMimeType; |
| |
| // A map containing the request id and the handler. The id should be in lower case to prevent string comparison issues. |
| this._responseHandlers = {}; |
| this._reader = options.reader || this.#getDefaultReader(this.mimeType); |
| this._writer = options.writer || this.#getDefaultWriter(this.mimeType); |
| this._openPromise = null; |
| this._openCallback = null; |
| this._closePromise = null; |
| this._closeCallback = null; |
| |
| this._header = String.fromCharCode(this.mimeType.length) + this.mimeType; // TODO: what if mimeType.length > 255 |
| this._header_buf = Buffer.from(this._header); |
| this.traversalSource = options.traversalSource || 'g'; |
| this._authenticator = options.authenticator; |
| this._enableUserAgentOnConnect = options.enableUserAgentOnConnect !== false; |
| } |
| |
| /** |
| * Opens the connection, if its not already opened. |
| * @returns {Promise} |
| */ |
| async open() { |
| if (this.isOpen) { |
| return; |
| } |
| if (this._openPromise) { |
| return this._openPromise; |
| } |
| |
| this._openPromise = DeferredPromise(); |
| |
| this.emit('log', 'ws open'); |
| let headers = this.options.headers; |
| if (this._enableUserAgentOnConnect) { |
| if (!headers) { |
| headers = {}; |
| } |
| |
| const userAgent = await utils.getUserAgent(); |
| if (userAgent !== undefined) { |
| headers[utils.getUserAgentHeader()] = userAgent; |
| } |
| } |
| |
| const WebSocket = (globalThis.WebSocket as typeof globalThis.WebSocket | undefined) ?? (await import('ws')).default; |
| |
| this._ws = new WebSocket( |
| this.url, |
| globalThis.WebSocket === undefined |
| ? { |
| // @ts-expect-error |
| headers: headers, |
| ca: this.options.ca, |
| cert: this.options.cert, |
| pfx: this.options.pfx, |
| rejectUnauthorized: this.options.rejectUnauthorized, |
| agent: this.options.agent, |
| } |
| : undefined, |
| ); |
| |
| if ('binaryType' in this._ws!) { |
| this._ws.binaryType = 'arraybuffer'; |
| } |
| |
| // @ts-expect-error |
| this._ws!.addEventListener('open', this.#handleOpen); |
| // @ts-expect-error |
| this._ws!.addEventListener('error', this.#handleError); |
| // @ts-expect-error |
| this._ws!.addEventListener('message', this.#handleMessage); |
| // @ts-expect-error |
| this._ws!.addEventListener('close', this.#handleClose); |
| |
| return await this._openPromise; |
| } |
| |
| /** @override */ |
| submit(processor: string | undefined, op: string, args: any, requestId?: string | null) { |
| // TINKERPOP-2847: Use lower case to prevent string comparison issues. |
| const rid = (requestId || utils.getUuid()).toLowerCase(); |
| if (!rid.match(uuidPattern)) { |
| throw new Error('Provided requestId "' + rid + '" is not a valid UUID.'); |
| } |
| |
| return this.open().then( |
| () => |
| new Promise((resolve, reject) => { |
| if (op !== 'authentication') { |
| this._responseHandlers[rid] = { |
| callback: (err: Error, result: any) => (err ? reject(err) : resolve(result)), |
| result: null, |
| }; |
| } |
| |
| const request = { |
| requestId: rid, |
| op: op || 'bytecode', |
| // if using op eval need to ensure processor stays unset if caller didn't set it. |
| processor: !processor && op !== 'eval' ? 'traversal' : processor, |
| args: args || {}, |
| }; |
| |
| const request_buf = this._writer.writeRequest(request); |
| const message = utils.toArrayBuffer(Buffer.concat([this._header_buf, request_buf])); |
| this._ws!.send(message); |
| }), |
| ); |
| } |
| |
| /** @override */ |
| stream(processor: string, op: string, args: any, requestId?: string) { |
| // TINKERPOP-2847: Use lower case to prevent string comparison issues. |
| const rid = (requestId || utils.getUuid()).toLowerCase(); |
| if (!rid.match(uuidPattern)) { |
| throw new Error('Provided requestId "' + rid + '" is not a valid UUID.'); |
| } |
| |
| const readableStream = new Stream.Readable({ |
| objectMode: true, |
| read() {}, |
| }); |
| |
| this._responseHandlers[rid] = { |
| callback: (err: Error) => (err ? readableStream.destroy(err) : readableStream.push(null)), |
| result: readableStream, |
| }; |
| |
| this.open() |
| .then(() => { |
| const request = { |
| requestId: rid, |
| op: op || 'bytecode', |
| // if using op eval need to ensure processor stays unset if caller didn't set it. |
| processor: !processor && op !== 'eval' ? 'traversal' : processor, |
| args: args || {}, |
| }; |
| |
| const request_buf = this._writer.writeRequest(request); |
| const message = utils.toArrayBuffer(Buffer.concat([this._header_buf, request_buf])); |
| this._ws!.send(message); |
| }) |
| .catch((err) => readableStream.destroy(err)); |
| |
| return readableStream; |
| } |
| |
| #getDefaultReader(mimeType: MimeType) { |
| if (mimeType === graphBinaryMimeType) { |
| return graphBinaryReader; |
| } |
| |
| return mimeType === graphSON2MimeType ? new serializer.GraphSON2Reader() : new serializer.GraphSONReader(); |
| } |
| |
| #getDefaultWriter(mimeType: MimeType) { |
| if (mimeType === graphBinaryMimeType) { |
| return graphBinaryWriter; |
| } |
| |
| return mimeType === graphSON2MimeType ? new serializer.GraphSON2Writer() : new serializer.GraphSONWriter(); |
| } |
| |
| #handleOpen = (_: Event | NodeWebSocketEvent) => { |
| this._openPromise?.resolve(); |
| this.isOpen = true; |
| }; |
| |
| #handleError = (event: Event | NodeWebSocketErrorEvent) => { |
| const error = 'error' in event ? event.error : event; |
| this._openPromise?.reject(error); |
| this.emit('log', `ws error ${error}`); |
| this.#cleanupWebsocket(error); |
| this.emit('socketError', error); |
| }; |
| |
| #handleClose = ({ code, reason }: CloseEvent | NodeWebSocketCloseEvent) => { |
| this.emit('log', `ws close code=${code} message=${reason}`); |
| this.#cleanupWebsocket(); |
| if (this._closeCallback) { |
| this._closeCallback(); |
| } |
| this.emit('close', code, reason); |
| }; |
| |
| #handleMessage = ({ data: _data }: MessageEvent | NodeWebSocketMessageEvent) => { |
| const data = _data instanceof ArrayBuffer ? Buffer.from(_data) : _data; |
| |
| const response = this._reader.readResponse(data); |
| if (response.requestId === null || response.requestId === undefined) { |
| // There was a serialization issue on the server that prevented the parsing of the request id |
| // We invoke any of the pending handlers with an error |
| Object.keys(this._responseHandlers).forEach((requestId) => { |
| const handler = this._responseHandlers[requestId]; |
| this.#clearHandler(requestId); |
| if (response.status !== undefined && response.status.message) { |
| return handler.callback( |
| // TINKERPOP-2285: keep the old server error message in case folks are parsing that - fix in a future breaking version |
| new ResponseError( |
| `Server error (no request information): ${response.status.message} (${response.status.code})`, |
| response.status, |
| ), |
| ); |
| } |
| // TINKERPOP-2285: keep the old server error message in case folks are parsing that - fix in a future breaking version |
| return handler.callback( |
| new ResponseError(`Server error (no request information): ${JSON.stringify(response)}`, response.status), |
| ); |
| }); |
| return; |
| } |
| |
| // TINKERPOP-2847: Use lower case to prevent string comparison issues. |
| response.requestId = response.requestId.toLowerCase(); |
| const handler = this._responseHandlers[response.requestId]; |
| |
| if (!handler) { |
| // The handler for a given request id was not found |
| // It was probably invoked earlier due to a serialization issue. |
| return; |
| } |
| |
| if (response.status.code === responseStatusCode.authenticationChallenge && this._authenticator) { |
| this._authenticator |
| .evaluateChallenge(response.result.data) |
| .then((res: any) => this.submit(undefined, 'authentication', res, response.requestId)) |
| .catch(handler.callback); |
| |
| return; |
| } else if (response.status.code >= 400) { |
| // callback in error |
| return handler.callback( |
| // TINKERPOP-2285: keep the old server error message in case folks are parsing that - fix in a future breaking version |
| new ResponseError(`Server error: ${response.status.message} (${response.status.code})`, response.status), |
| ); |
| } |
| |
| const isStreamingResponse = handler.result instanceof Stream.Readable; |
| |
| switch (response.status.code) { |
| case responseStatusCode.noContent: |
| this.#clearHandler(response.requestId); |
| if (isStreamingResponse) { |
| handler.result.push(new ResultSet(utils.emptyArray, response.status.attributes)); |
| return handler.callback(null); |
| } |
| return handler.callback(null, new ResultSet(utils.emptyArray, response.status.attributes)); |
| case responseStatusCode.partialContent: |
| if (isStreamingResponse) { |
| handler.result.push(new ResultSet(response.result.data, response.status.attributes)); |
| break; |
| } |
| handler.result = handler.result || []; |
| handler.result.push.apply(handler.result, response.result.data); |
| break; |
| default: |
| if (isStreamingResponse) { |
| handler.result.push(new ResultSet(response.result.data, response.status.attributes)); |
| return handler.callback(null); |
| } |
| if (handler.result) { |
| handler.result.push.apply(handler.result, response.result.data); |
| } else { |
| handler.result = response.result.data; |
| } |
| this.#clearHandler(response.requestId); |
| return handler.callback(null, new ResultSet(handler.result, response.status.attributes)); |
| } |
| }; |
| |
| /** |
| * clean websocket context |
| */ |
| #cleanupWebsocket(err?: Error) { |
| // Invoke waiting callbacks to complete Promises when closing the websocket |
| Object.keys(this._responseHandlers).forEach((requestId) => { |
| const handler = this._responseHandlers[requestId]; |
| const isStreamingResponse = handler.result instanceof Stream.Readable; |
| if (isStreamingResponse) { |
| handler.callback(null); |
| } else { |
| const cause = err ? err : new Error('Connection has been closed.'); |
| handler.callback(cause); |
| } |
| }); |
| // @ts-expect-error |
| this._ws?.removeEventListener('open', this.#handleOpen); |
| // @ts-expect-error |
| this._ws?.removeEventListener('error', this.#handleError); |
| // @ts-expect-error |
| this._ws?.removeEventListener('message', this.#handleMessage); |
| // @ts-expect-error |
| this._ws?.removeEventListener('close', this.#handleClose); |
| this._openPromise = null; |
| this._closePromise = null; |
| this.isOpen = false; |
| } |
| |
| /** |
| * Clears the internal state containing the callback and result buffer of a given request. |
| * @param requestId |
| * @private |
| */ |
| #clearHandler(requestId: string) { |
| delete this._responseHandlers[requestId]; |
| } |
| |
| /** |
| * Closes the Connection. |
| * @return {Promise} |
| */ |
| close() { |
| if (this.isOpen === false) { |
| return Promise.resolve(); |
| } |
| if (!this._closePromise) { |
| this._closePromise = new Promise((resolve) => { |
| this._closeCallback = resolve; |
| this._ws?.close(); |
| }); |
| } |
| return this._closePromise; |
| } |
| } |