blob: b1f1274812444e8de4e937773571c9ac54026273 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Buffer } from 'buffer';
/**
* Async byte reader that provides a uniform interface over both a complete Buffer
* (for non-streaming submit()) and a ReadableStream (for streaming HTTP responses).
*
* Handles chunk boundaries transparently and blocks (awaits) until the requested bytes are available.
*/
export default class StreamReader {
/** @type {Buffer} */
#buffer;
/** @type {number} */
#offset;
/** @type {ReadableStreamDefaultReader|null} */
#reader;
/** @type {number} Total bytes consumed (monotonically increasing, survives chunk reassembly) */
#position;
/**
* @param {Buffer} initialBuffer
* @param {ReadableStreamDefaultReader|null} reader
*/
constructor(initialBuffer, reader) {
this.#buffer = initialBuffer;
this.#offset = 0;
this.#reader = reader;
this.#position = 0;
}
/**
* Create a StreamReader backed by a complete Buffer.
* All reads are satisfied from the buffer; no async I/O occurs.
* @param {Buffer} buffer
* @returns {StreamReader}
*/
static fromBuffer(buffer) {
return new StreamReader(buffer, null);
}
/**
* Create a StreamReader backed by a ReadableStream (e.g. fetch response.body).
* Reads pull chunks from the stream as needed.
* @param {ReadableStream} readableStream
* @returns {StreamReader}
*/
static fromReadableStream(readableStream) {
return new StreamReader(Buffer.alloc(0), readableStream.getReader());
}
/**
* Ensure at least `n` bytes are available in the buffer from the current offset.
* For buffer-backed readers this is a bounds check. For stream-backed readers
* this pulls chunks until enough data is buffered.
* @param {number} n
*/
async #ensure(n) {
const available = this.#buffer.length - this.#offset;
if (available >= n) {
return;
}
if (this.#reader === null) {
throw new Error(
`Unexpected end of buffer at position ${this.#position}: needed ${n} bytes, ${available} available`,
);
}
// Collect chunks until we have enough
const chunks = [this.#buffer.subarray(this.#offset)];
let total = available;
while (total < n) {
const { value, done } = await this.#reader.read();
if (done) {
break;
}
const chunk = Buffer.isBuffer(value) ? value : Buffer.from(value);
chunks.push(chunk);
total += chunk.length;
}
if (total < n) {
throw new Error(`Unexpected end of stream at position ${this.#position}: needed ${n} bytes, ${total} available`);
}
this.#buffer = Buffer.concat(chunks);
this.#offset = 0;
}
/**
* Total number of bytes consumed so far (monotonically increasing).
* Useful for error diagnostics.
* @returns {number}
*/
get position() {
return this.#position;
}
/**
* Read exactly `n` bytes and return them as a Buffer.
* @param {number} n
* @returns {Promise<Buffer>}
*/
async readBytes(n) {
await this.#ensure(n);
const result = this.#buffer.subarray(this.#offset, this.#offset + n);
this.#offset += n;
this.#position += n;
return result;
}
/**
* @returns {Promise<number>} unsigned 8-bit integer
*/
async readUInt8() {
await this.#ensure(1);
this.#position++;
return this.#buffer[this.#offset++];
}
/**
* @returns {Promise<number>} signed 8-bit integer
*/
async readByte() {
await this.#ensure(1);
this.#position++;
return this.#buffer.readInt8(this.#offset++);
}
/**
* @returns {Promise<number>} signed 16-bit big-endian integer
*/
async readInt16BE() {
await this.#ensure(2);
const v = this.#buffer.readInt16BE(this.#offset);
this.#offset += 2;
this.#position += 2;
return v;
}
/**
* @returns {Promise<number>} signed 32-bit big-endian integer
*/
async readInt32BE() {
await this.#ensure(4);
const v = this.#buffer.readInt32BE(this.#offset);
this.#offset += 4;
this.#position += 4;
return v;
}
/**
* @returns {Promise<bigint>} signed 64-bit big-endian integer
*/
async readBigInt64BE() {
await this.#ensure(8);
const v = this.#buffer.readBigInt64BE(this.#offset);
this.#offset += 8;
this.#position += 8;
return v;
}
/**
* @returns {Promise<number>} 32-bit big-endian float
*/
async readFloatBE() {
await this.#ensure(4);
const v = this.#buffer.readFloatBE(this.#offset);
this.#offset += 4;
this.#position += 4;
return v;
}
/**
* @returns {Promise<number>} 64-bit big-endian double
*/
async readDoubleBE() {
await this.#ensure(8);
const v = this.#buffer.readDoubleBE(this.#offset);
this.#offset += 8;
this.#position += 8;
return v;
}
}