blob: cf89c84872fbf5ac1eab4df841815314f1fadcc6 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* GraphBinaryReader v4 response format tests.
* Tests the reader's ability to parse v4 response format:
* {version:0x84}{bulked:Byte}{result_data stream}{marker:0xFD 0x00 0x00}{status_code:Int bare}{status_message:nullable}{exception:nullable}
*/
import { assert } from 'chai';
import { Buffer } from 'buffer';
import GraphBinaryReader from '../../../lib/structure/io/binary/internals/GraphBinaryReader.js';
import StreamReader from '../../../lib/structure/io/binary/internals/StreamReader.js';
import ioc from '../../../lib/structure/io/binary/GraphBinary.js';
import { Traverser } from '../../../lib/process/traversal.js';
import ResponseError from '../../../lib/driver/response-error.js';
describe('GraphBinaryReader', () => {
const reader = new GraphBinaryReader(ioc);
describe('input validation', () => {
it('undefined buffer throws error', async () => {
try {
await reader.readResponse(undefined);
assert.fail('should have thrown');
} catch (e) {
assert.match(e.message, /Buffer is missing/);
}
});
it('null buffer throws error', async () => {
try {
await reader.readResponse(null);
assert.fail('should have thrown');
} catch (e) {
assert.match(e.message, /Buffer is missing/);
}
});
it('empty buffer throws error', async () => {
try {
await reader.readResponse(Buffer.alloc(0));
assert.fail('should have thrown');
} catch (e) {
assert.match(e.message, /Buffer is empty/);
}
});
});
describe('version validation', () => {
it('rejects version 0x00', async () => {
try {
await reader.readResponse(Buffer.from([0x00, 0x00]));
assert.fail('should have thrown');
} catch (e) {
assert.match(e.message, /Unsupported version/);
}
});
it('rejects version 0x81', async () => {
try {
await reader.readResponse(Buffer.from([0x81, 0x00]));
assert.fail('should have thrown');
} catch (e) {
assert.match(e.message, /Unsupported version/);
}
});
});
describe('non-bulked responses', () => {
it('single value', async () => {
const buffer = Buffer.from([
0x84, // version
0x00, // bulked=false
0x01, 0x00, 0x00, 0x00, 0x00, 0x43, // fq Int: type_code=0x01, value_flag=0x00, value=67
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x01, // status_message null flag
0x01 // exception null flag
]);
const result = await reader.readResponse(buffer);
assert.deepEqual(result, {
status: { code: 200, message: null, exception: null },
result: { data: [67], bulked: false },
});
});
it('multiple values', async () => {
const buffer = Buffer.from([
0x84, // version
0x00, // bulked=false
0x01, 0x00, 0x00, 0x00, 0x00, 0x43, // fq Int: 67
0x03, 0x00, 0x00, 0x00, 0x00, 0x05, 0x68, 0x65, 0x6C, 0x6C, 0x6F, // fq String: "hello"
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x01, // status_message null
0x01 // exception null
]);
const result = await reader.readResponse(buffer);
assert.deepEqual(result, {
status: { code: 200, message: null, exception: null },
result: { data: [67, 'hello'], bulked: false }
});
});
it('empty result', async () => {
const buffer = Buffer.from([
0x84, // version
0x00, // bulked=false
0xFD, 0x00, 0x00, // marker (no data)
0x00, 0x00, 0x00, 0xCC, // status_code=204
0x01, // status_message null
0x01 // exception null
]);
const result = await reader.readResponse(buffer);
assert.deepEqual(result, {
status: { code: 204, message: null, exception: null },
result: { data: [], bulked: false }
});
});
});
describe('bulked responses', () => {
it('single item with bulk count', async () => {
const buffer = Buffer.from([
0x84, // version
0x01, // bulked=true
0x01, 0x00, 0x00, 0x00, 0x00, 0x43, // fq Int: 67
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // fq Long bulk=3
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x01, // status_message null
0x01 // exception null
]);
const result = await reader.readResponse(buffer);
assert.deepEqual(result, {
status: { code: 200, message: null, exception: null },
result: { data: [{ v: 67, bulk: 3 }], bulked: true }
});
});
it('multiple items with bulk counts', async () => {
const buffer = Buffer.from([
0x84, // version
0x01, // bulked=true
0x01, 0x00, 0x00, 0x00, 0x00, 0x43, // fq Int: 67
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, // fq Long bulk=2
0x03, 0x00, 0x00, 0x00, 0x00, 0x05, 0x68, 0x65, 0x6C, 0x6C, 0x6F, // fq String: "hello"
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, // fq Long bulk=1
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x01, // status_message null
0x01 // exception null
]);
const result = await reader.readResponse(buffer);
assert.deepEqual(result, {
status: { code: 200, message: null, exception: null },
result: { data: [{ v: 67, bulk: 2 }, { v: 'hello', bulk: 1 }], bulked: true }
});
});
});
describe('status codes', () => {
it('status 403', async () => {
const buffer = Buffer.from([
0x84, 0x00, // version, bulked=false
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x01, 0x93, // status_code=403
0x01, 0x01 // null message, null exception
]);
const result = await reader.readResponse(buffer);
assert.equal(result.status.code, 403);
});
it('status 500', async () => {
const buffer = Buffer.from([
0x84, 0x00, // version, bulked=false
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x01, 0xF4, // status_code=500
0x01, 0x01 // null message, null exception
]);
const result = await reader.readResponse(buffer);
assert.equal(result.status.code, 500);
});
});
describe('nullable status_message', () => {
it('present message', async () => {
const buffer = Buffer.from([
0x84, 0x00, // version, bulked=false
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x00, // message present flag
0x00, 0x00, 0x00, 0x07, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, // bare String: "Success"
0x01 // exception null
]);
const result = await reader.readResponse(buffer);
assert.equal(result.status.message, 'Success');
});
it('null message', async () => {
const buffer = Buffer.from([
0x84, 0x00, // version, bulked=false
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x01, // message null flag
0x01 // exception null
]);
const result = await reader.readResponse(buffer);
assert.equal(result.status.message, null);
});
});
describe('nullable exception', () => {
it('present exception', async () => {
const buffer = Buffer.from([
0x84, 0x00, // version, bulked=false
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x01, 0xF4, // status_code=500
0x01, // message null
0x00, // exception present flag
0x00, 0x00, 0x00, 0x05, 0x45, 0x72, 0x72, 0x6F, 0x72 // bare String: "Error"
]);
const result = await reader.readResponse(buffer);
assert.equal(result.status.exception, 'Error');
});
it('null exception', async () => {
const buffer = Buffer.from([
0x84, 0x00, // version, bulked=false
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x01, // message null
0x01 // exception null flag
]);
const result = await reader.readResponse(buffer);
assert.equal(result.status.exception, null);
});
});
describe('error response', () => {
it('no result data with error status', async () => {
const buffer = Buffer.from([
0x84, 0x00, // version, bulked=false
0xFD, 0x00, 0x00, // marker (no data)
0x00, 0x00, 0x01, 0xF4, // status_code=500
0x00, // message present
0x00, 0x00, 0x00, 0x0E, 0x49, 0x6E, 0x74, 0x65, 0x72, 0x6E, 0x61, 0x6C, 0x20, 0x65, 0x72, 0x72, 0x6F, 0x72, // "Internal error" (14 chars)
0x00, // exception present
0x00, 0x00, 0x00, 0x09, 0x45, 0x78, 0x63, 0x65, 0x70, 0x74, 0x69, 0x6F, 0x6E // "Exception"
]);
const result = await reader.readResponse(buffer);
assert.deepEqual(result, {
status: { code: 500, message: 'Internal error', exception: 'Exception' },
result: { data: [], bulked: false }
});
});
});
describe('complex result values', () => {
it('vertex in result data', async () => {
const buffer = Buffer.from([
0x84, 0x00, // version, bulked=false
0x11, 0x00, // fq Vertex: type_code=0x11, value_flag=0x00
0x01, 0x00, 0x00, 0x00, 0x00, 0x01, // id: fq Int=1
0x00, 0x00, 0x00, 0x01, // label: bare List length=1
0x03, 0x00, 0x00, 0x00, 0x00, 0x06, 0x70, 0x65, 0x72, 0x73, 0x6F, 0x6E, // fq String="person"
0x0B, 0x00, 0x00, 0x00, 0x00, 0x00, // properties: fq List=empty
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x01, 0x01 // null message, null exception
]);
const result = await reader.readResponse(buffer);
assert.equal(result.result.data.length, 1);
assert.equal(result.result.data[0].id, 1);
assert.equal(result.result.data[0].label, 'person');
});
});
describe('readResponseStream', () => {
it('non-bulked stream yields raw values', async () => {
const buffer = Buffer.from([
0x84, // version
0x00, // bulked=false
0x01, 0x00, 0x00, 0x00, 0x00, 0x43, // fq Int=67
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x01, // status_message null
0x01 // exception null
]);
const streamReader = StreamReader.fromBuffer(buffer);
const results = [];
for await (const item of reader.readResponseStream(streamReader)) {
results.push(item);
}
assert.deepEqual(results, [67]);
});
it('bulked stream yields Traverser objects', async () => {
const buffer = Buffer.from([
0x84, // version
0x01, // bulked=true
0x01, 0x00, 0x00, 0x00, 0x00, 0x43, // fq Int=67
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // fq Long=3
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x01, // status_message null
0x01 // exception null
]);
const streamReader = StreamReader.fromBuffer(buffer);
const results = [];
for await (const item of reader.readResponseStream(streamReader)) {
results.push(item);
}
assert.equal(results.length, 1);
assert.instanceOf(results[0], Traverser);
assert.equal(results[0].object, 67);
assert.equal(results[0].bulk, 3);
});
it('bulked stream Traversers expand correctly via Traversal', async () => {
const { Traversal } = await import('../../../lib/process/traversal.js');
const buffer = Buffer.from([
0x84, // version
0x01, // bulked=true
0x01, 0x00, 0x00, 0x00, 0x00, 0x43, // fq Int=67
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // fq Long=3
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x01, 0x01
]);
const streamReader = StreamReader.fromBuffer(buffer);
const gen = reader.readResponseStream(streamReader);
const traversal = new Traversal(null, null);
traversal._resultsStream = gen;
const list = await traversal.toList();
// bulk=3 should expand to 3 copies of 67
assert.deepEqual(list, [67, 67, 67]);
});
it('trailing status is accessible via Traversal.getStatus()', async () => {
const { Traversal } = await import('../../../lib/process/traversal.js');
const buffer = Buffer.from([
0x84, // version
0x00, // bulked=false
0x01, 0x00, 0x00, 0x00, 0x00, 0x43, // fq Int=67
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x00, // message present
0x00, 0x00, 0x00, 0x02, 0x4F, 0x4B, // "OK"
0x01 // exception null
]);
const streamReader = StreamReader.fromBuffer(buffer);
const gen = reader.readResponseStream(streamReader);
const traversal = new Traversal(null, null);
traversal._resultsStream = gen;
assert.isNull(traversal.getStatus());
await traversal.toList();
const status = traversal.getStatus();
assert.isNotNull(status);
assert.equal(status.code, 200);
assert.equal(status.message, 'OK');
assert.isNull(status.exception);
});
it('throws ResponseError on server error status after yielding values', async () => {
const buffer = Buffer.from([
0x84, // version
0x00, // bulked=false
0x01, 0x00, 0x00, 0x00, 0x00, 0x43, // fq Int=67
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x01, 0xF4, // status_code=500
0x00, // message present
0x00, 0x00, 0x00, 0x0E, 0x49, 0x6E, 0x74, 0x65, 0x72, 0x6E, 0x61, 0x6C, 0x20, 0x65, 0x72, 0x72, 0x6F, 0x72, // "Internal error"
0x00, // exception present
0x00, 0x00, 0x00, 0x09, 0x45, 0x78, 0x63, 0x65, 0x70, 0x74, 0x69, 0x6F, 0x6E // "Exception"
]);
const streamReader = StreamReader.fromBuffer(buffer);
const results = [];
try {
for await (const item of reader.readResponseStream(streamReader)) {
results.push(item);
}
assert.fail('should have thrown');
} catch (e) {
assert.instanceOf(e, ResponseError);
assert.equal(e.statusCode, 500);
assert.match(e.message, /Internal error/);
}
// Verify partial results were yielded before the error
assert.deepEqual(results, [67]);
});
it('hasNext() peeks without consuming from streaming source', async () => {
const { Traversal } = await import('../../../lib/process/traversal.js');
const buffer = Buffer.from([
0x84, 0x00,
0x01, 0x00, 0x00, 0x00, 0x00, 0x01, // fq Int=1
0x01, 0x00, 0x00, 0x00, 0x00, 0x02, // fq Int=2
0xFD, 0x00, 0x00,
0x00, 0x00, 0x00, 0xC8,
0x01, 0x01
]);
const streamReader = StreamReader.fromBuffer(buffer);
const gen = reader.readResponseStream(streamReader);
const traversal = new Traversal(null, null);
traversal._resultsStream = gen;
// hasNext should return true without consuming
assert.isTrue(await traversal.hasNext());
assert.isTrue(await traversal.hasNext()); // calling again should still be true
// next() should return the peeked value
const first = await traversal.next();
assert.equal(first.value, 1);
assert.isTrue(await traversal.hasNext());
const second = await traversal.next();
assert.equal(second.value, 2);
assert.isFalse(await traversal.hasNext());
});
it('deserializes correctly when response is split into 1-byte chunks via ReadableStream', async () => {
// Build a complete v4 response: version + bulked=false + Int(67) + String("hi") + marker + status 200
const buffer = Buffer.from([
0x84, // version
0x00, // bulked=false
0x01, 0x00, 0x00, 0x00, 0x00, 0x43, // fq Int=67
0x03, 0x00, 0x00, 0x00, 0x00, 0x02, 0x68, 0x69, // fq String="hi"
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x01, // status_message null
0x01 // exception null
]);
// Split into 1-byte chunks
const chunks = [];
for (let i = 0; i < buffer.length; i++) {
chunks.push(buffer.subarray(i, i + 1));
}
const stream = new ReadableStream({
pull(controller) {
if (chunks.length > 0) {
controller.enqueue(new Uint8Array(chunks.shift()));
} else {
controller.close();
}
},
});
const streamReader = StreamReader.fromReadableStream(stream);
const results = [];
for await (const item of reader.readResponseStream(streamReader)) {
results.push(item);
}
assert.deepEqual(results, [67, 'hi']);
});
it('yields first result before remaining data is available', async () => {
// v4 response: version + bulked=false + Int(42) + Int(99) + marker + status 200
const response = Buffer.from([
0x84, // version
0x00, // bulked=false
0x01, 0x00, 0x00, 0x00, 0x00, 0x2A, // fq Int=42
0x01, 0x00, 0x00, 0x00, 0x00, 0x63, // fq Int=99
0xFD, 0x00, 0x00, // marker
0x00, 0x00, 0x00, 0xC8, // status_code=200
0x01, // status_message null
0x01 // exception null
]);
// Split: first chunk has header (2 bytes) + first value (6 bytes) = 8 bytes.
// The rest (second value + marker + status) is withheld.
const firstChunk = response.subarray(0, 8);
const remaining = response.subarray(8);
let releaseRemaining;
const remainingReady = new Promise((resolve) => { releaseRemaining = resolve; });
let closed = false;
const stream = new ReadableStream({
async pull(controller) {
if (!closed) {
// First pull: deliver just enough for the header + first value
controller.enqueue(new Uint8Array(firstChunk));
closed = true;
// Block until test signals to release the rest
await remainingReady;
controller.enqueue(new Uint8Array(remaining));
controller.close();
}
},
});
const streamReader = StreamReader.fromReadableStream(stream);
const gen = reader.readResponseStream(streamReader);
// First next() should resolve with 42 — only the first chunk was delivered
const first = await gen.next();
assert.isFalse(first.done);
assert.equal(first.value, 42);
// Release the remaining data
releaseRemaining();
// Second value should now be available
const second = await gen.next();
assert.isFalse(second.done);
assert.equal(second.value, 99);
// Generator should complete with status as return value
const end = await gen.next();
assert.isTrue(end.done);
assert.equal(end.value.code, 200);
});
});
});