blob: ea1b231d5995d8634c6ad63783530c03bb4d8e5c [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* @author Jorge Bay Gondra
*/
import { Buffer } from 'buffer';
import { EventEmitter } from 'eventemitter3';
import type { Agent } from 'node:http';
import ioc from '../structure/io/binary/GraphBinary.js';
import StreamReader from '../structure/io/binary/internals/StreamReader.js';
import * as utils from '../utils.js';
import ResultSet from './result-set.js';
import {RequestMessage} from "./request-message.js";
import ResponseError from './response-error.js';
import { Traverser } from '../process/traversal.js';
const { graphBinaryReader, graphBinaryWriter } = ioc;
const responseStatusCode = {
success: 200,
noContent: 204,
partialContent: 206,
};
export type HttpRequest = {
url: string;
method: string;
headers: Record<string, string>;
body: any;
};
export type RequestInterceptor = (request: HttpRequest) => HttpRequest | Promise<HttpRequest>;
export type ConnectionOptions = {
ca?: string[];
cert?: string | string[] | Buffer;
pfx?: string | Buffer;
reader?: any;
rejectUnauthorized?: boolean;
traversalSource?: string;
writer?: any | null;
headers?: Record<string, string | string[]>;
enableUserAgentOnConnect?: boolean;
agent?: Agent;
interceptors?: RequestInterceptor | RequestInterceptor[];
};
/**
* Represents a single connection to a Gremlin Server.
*/
export default class Connection extends EventEmitter {
private readonly _reader: any;
private readonly _writer: any | null;
isOpen = true;
traversalSource: string;
private readonly _enableUserAgentOnConnect: boolean;
private readonly _interceptors: RequestInterceptor[];
/**
* Creates a new instance of {@link Connection}.
* @param {String} url The resource uri.
* @param {ConnectionOptions} [options] The connection options.
*/
constructor(
readonly url: string,
readonly options: ConnectionOptions = {},
) {
super();
this._reader = options.reader || graphBinaryReader;
this._writer = 'writer' in options ? options.writer : graphBinaryWriter;
this.traversalSource = options.traversalSource || 'g';
this._enableUserAgentOnConnect = options.enableUserAgentOnConnect !== false;
const interceptors = options.interceptors;
if (typeof interceptors === 'function') {
this._interceptors = [interceptors];
} else if (Array.isArray(interceptors)) {
this._interceptors = interceptors;
} else if (interceptors === undefined || interceptors === null) {
this._interceptors = [];
} else {
throw new TypeError('interceptors must be a function, array, or undefined');
}
}
/**
* Opens the connection, if its not already opened.
* @returns {Promise}
*/
async open() {
// No-op for HTTP connections
return Promise.resolve();
}
/**
* Send a request and buffer the entire response. Returns a Promise<ResultSet>.
*/
async submit(request: RequestMessage) {
const body = this._writer ? this._writer.writeRequest(request) : request;
const response = await this.#makeHttpRequest(body);
return this.#handleResponse(response);
}
/**
* Send a request and stream the response incrementally.
* Returns an AsyncGenerator that yields deserialized result items.
* For bulked responses, yields Traverser objects.
*
* In the GraphBinary v4 streaming protocol, the server sends the status after all
* result data. If the server encounters an error mid-traversal, values yielded before
* the error are valid partial results. A ResponseError is thrown after the last value
* has been yielded.
*
* @param {RequestMessage} request
* @returns {AsyncGenerator<any>}
*/
async *stream(request: RequestMessage): AsyncGenerator<any> {
const body = this._writer ? this._writer.writeRequest(request) : request;
const abortController = new AbortController();
let response: Response;
try {
response = await this.#makeHttpRequest(body, abortController.signal);
} catch (e: any) {
throw new Error(`Stream request failed: ${e.message}`, { cause: e });
}
if (!response.ok) {
// For error responses, buffer and parse the error body
const buffer = Buffer.from(await response.arrayBuffer());
const errorMessage = `Server returned HTTP ${response.status}: ${response.statusText}`;
const contentType = response.headers.get("Content-Type");
const reader = this.#getReaderForContentType(contentType);
try {
if (reader) {
const deserialized = await reader.readResponse(buffer);
throw new ResponseError(errorMessage, {
code: deserialized.status.code,
message: deserialized.status.message || response.statusText,
exception: deserialized.status.exception,
});
} else if (contentType === 'application/json') {
const errorBody = JSON.parse(buffer.toString());
throw new ResponseError(errorMessage, {
code: response.status,
message: errorBody.message || errorBody.error || response.statusText,
});
}
} catch (err) {
if (err instanceof ResponseError) throw err;
}
throw new ResponseError(errorMessage, {
code: response.status,
message: response.statusText,
});
}
if (!response.body) {
// 204 No Content — nothing to yield
return;
}
const streamReader = StreamReader.fromReadableStream(response.body);
let completed = false;
try {
yield* this._reader.readResponseStream(streamReader);
completed = true;
} finally {
if (!completed) {
// Consumer broke out early or an error occurred — abort to release the connection
abortController.abort();
}
}
}
#getReaderForContentType(contentType: string | null) {
if (!contentType) {
return this._reader;
}
if (contentType === this._reader.mimeType) {
return this._reader;
}
return null;
}
async #makeHttpRequest(body: any, signal?: AbortSignal): Promise<Response> {
const headers: Record<string, string> = {
'Accept': this._reader.mimeType
};
if (this._writer) {
headers['Content-Type'] = this._writer.mimeType;
}
if (this._enableUserAgentOnConnect) {
const userAgent = await utils.getUserAgent();
if (userAgent !== undefined) {
headers[utils.getUserAgentHeader()] = userAgent;
}
}
if (this.options.headers) {
Object.entries(this.options.headers).forEach(([key, value]) => {
headers[key] = Array.isArray(value) ? value.join(', ') : value;
});
}
let httpRequest: HttpRequest = {
url: this.url,
method: 'POST',
headers,
body,
};
for (let i = 0; i < this._interceptors.length; i++) {
try {
httpRequest = await this._interceptors[i](httpRequest);
} catch (e: any) {
throw new Error(`Request interceptor at index ${i} failed: ${e.message}`, { cause: e });
}
}
return fetch(httpRequest.url, {
method: httpRequest.method,
headers: httpRequest.headers,
body: httpRequest.body,
signal,
});
}
async #handleResponse(response: Response) {
const contentType = response.headers.get("Content-Type");
const buffer = Buffer.from(await response.arrayBuffer());
const reader = this.#getReaderForContentType(contentType);
if (!response.ok) {
const errorMessage = `Server returned HTTP ${response.status}: ${response.statusText}`;
try {
if (reader) {
const deserialized = await reader.readResponse(buffer);
throw new ResponseError(errorMessage, {
code: deserialized.status.code,
message: deserialized.status.message || response.statusText,
exception: deserialized.status.exception,
});
} else if (contentType === 'application/json') {
const errorBody = JSON.parse(buffer.toString());
throw new ResponseError(errorMessage, {
code: response.status,
message: errorBody.message || errorBody.error || response.statusText,
});
}
} catch (err) {
if (err instanceof ResponseError) {
throw err;
}
}
throw new ResponseError(errorMessage, {
code: response.status,
message: response.statusText,
});
}
if (!reader) {
throw new Error(`Response Content-Type '${contentType}' does not match the configured reader (expected '${this._reader.mimeType}')`);
}
const deserialized = await reader.readResponse(buffer);
if (deserialized.status.code && deserialized.status.code !== responseStatusCode.success && deserialized.status.code !== responseStatusCode.noContent && deserialized.status.code !== responseStatusCode.partialContent) {
throw new ResponseError(
`Server error: ${deserialized.status.message || 'Unknown error'} (${deserialized.status.code})`,
{
code: deserialized.status.code,
message: deserialized.status.message || '',
exception: deserialized.status.exception,
}
);
}
return new ResultSet(
deserialized.result.bulked
? deserialized.result.data.map((item: { v: any; bulk: number }) => new Traverser(item.v, item.bulk))
: deserialized.result.data,
new Map(),
);
}
/**
* Closes the Connection.
* @return {Promise}
*/
close() {
return Promise.resolve();
}
}