blob: 83f8efb29abf47eb0a99430a66b9944f672d20a7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { EventEmitter } from 'node:events';
import type {
ClientConfig,
ClientCredentials, CommandResponse,
PasswordCredentials, RawClient, TokenCredentials
} from '../client/client.type.js';
import { handleResponse } from './client.utils.js';
import { responseError } from '../wire/error.utils.js';
import { debug } from './client.debug.js';
import { IggyConnection } from './client.connection.js';
import { LOGIN, LOGIN_WITH_TOKEN, PING } from '../wire/index.js';
/**
* Command codes that can be executed without authentication.
*/
const UNLOGGED_COMMAND_CODE = [
PING.code,
LOGIN.code,
LOGIN_WITH_TOKEN.code
];
/**
* Represents a queued command job waiting to be executed.
*/
type Job = {
/** Command code */
command: number,
/** Command payload */
payload: Buffer,
/** Promise resolve function */
resolve: (v: CommandResponse | PromiseLike<CommandResponse>) => void,
/** Promise reject function */
reject: (e: unknown) => void
};
/**
* Manages command execution and response handling for the Iggy server.
* Implements command queuing, authentication, and heartbeat functionality.
*/
export class CommandResponseStream extends EventEmitter {
/** Client configuration */
private options: ClientConfig;
/** Underlying connection to the server */
private connection: IggyConnection;
/** Queue of pending command jobs */
private _execQueue: Job[];
/** Whether the stream is currently processing a command */
public busy: boolean;
/** Whether the client has been authenticated */
isAuthenticated: boolean;
/** Authenticated user ID */
userId?: number;
/** Heartbeat interval timer handle */
heartbeatIntervalHandler?: NodeJS.Timeout;
/**
* Creates a new CommandResponseStream.
*
* @param options - Client configuration
*/
constructor(options: ClientConfig) {
super();
this.options = options;
this.connection = new IggyConnection(options);
this.busy = false;
this.isAuthenticated = false;
this._execQueue = [];
this._init();
};
/**
* Initializes the stream by setting up heartbeat and connection event handlers.
*/
_init() {
this.heartbeat(this.options.heartbeatInterval);
this.connection.on('disconnected', async () => {
this.isAuthenticated = false;
});
}
/**
* Sends a command to the server.
* Automatically handles connection and authentication if needed.
*
* @param command - Command code to send
* @param payload - Command payload buffer
* @param handleResponse - Whether to parse the response (default: true)
* @param last - Whether to add to end of queue (default: true)
* @returns Promise resolving to the command response
*/
async sendCommand(
command: number,
payload: Buffer,
handleResponse = true,
last = true
): Promise<CommandResponse> {
if (!this.connection.connected)
await this.connection.connect()
if (!this.isAuthenticated && !UNLOGGED_COMMAND_CODE.includes(command))
await this.authenticate(this.options.credentials);
return new Promise((resolve, reject) => {
if (last)
this._execQueue.push({ command, payload, resolve, reject });
else
this._execQueue.unshift({ command, payload, resolve, reject });
this._processQueue(handleResponse);
});
}
/**
* Processes queued commands sequentially.
* Emits 'finishQueue' when all commands are processed.
*
* @param handleResponse - Whether to parse responses
*/
async _processQueue(handleResponse = true): Promise<void> {
if (this.busy)
return;
this.busy = true;
while (this._execQueue.length > 0 && this.connection.socket.writable) {
const next = this._execQueue.shift();
if (!next) break;
const { command, payload, resolve, reject } = next;
try {
resolve(await this._processNext(command, payload, handleResponse));
} catch (err) {
reject(err);
}
}
this.busy = false;
this.emit('finishQueue');
}
/**
* Processes a single command by writing it to the connection and waiting for response.
*
* @param command - Command code
* @param payload - Command payload
* @param handleResp - Whether to parse the response
* @returns Promise resolving to the command response
*/
_processNext(
command: number,
payload: Buffer,
handleResp = true
): Promise<CommandResponse> {
const lastWrite = this.connection.writeCommand(command, payload);
debug('==> writeCommand', lastWrite);
return new Promise((resolve, reject) => {
if (!lastWrite)
return reject(new Error('failed to write to socket'));
const errCb = (err: unknown) => reject(err);
this.connection.once('error', errCb);
this.connection.once('response', (resp) => {
this.connection.removeListener('error', errCb);
if (!handleResp) return resolve(resp);
const r = handleResponse(resp);
if (r.status !== 0) {
return reject(responseError(command, r.status));
}
return resolve(r);
});
});
}
/**
* Fails all queued commands with the given error.
*
* @param err - Error to reject all queued commands with
*/
_failQueue(err: Error) {
this._execQueue.forEach(({ reject }) => reject(err));
this._execQueue = [];
}
/**
* Authenticates the client with the server.
*
* @param creds - Authentication credentials (token or password)
* @returns True if authentication succeeded
*/
async authenticate(creds: ClientCredentials) {
const r = ('token' in creds) ?
await this._authWithToken(creds) :
await this._authWithPassword(creds);
this.isAuthenticated = true;
this.userId = r.userId;
return this.isAuthenticated;
}
/**
* Authenticates using username and password.
*
* @param creds - Password credentials
* @returns Login response with user ID
*/
async _authWithPassword(creds: PasswordCredentials) {
const pl = LOGIN.serialize(creds);
const logr = await this.sendCommand(LOGIN.code, pl, true, false);
return LOGIN.deserialize(logr);
}
/**
* Authenticates using a token.
*
* @param creds - Token credentials
* @returns Login response with user ID
*/
async _authWithToken(creds: TokenCredentials) {
const pl = LOGIN_WITH_TOKEN.serialize(creds);
const logr = await this.sendCommand(LOGIN_WITH_TOKEN.code, pl, true, false);
return LOGIN_WITH_TOKEN.deserialize(logr);
}
/**
* Sends a ping command to the server.
*
* @returns Ping response
*/
async ping() {
const pl = PING.serialize();
const pingR = await this.sendCommand(PING.code, pl, true);
return PING.deserialize(pingR);
}
/**
* Starts sending periodic heartbeat pings to keep the connection alive.
*
* @param interval - Heartbeat interval in milliseconds
*/
heartbeat(interval?: number) {
if (!interval)
return
this.heartbeatIntervalHandler = setInterval(async () => {
if (this.connection.connected) {
debug(`sending hearbeat ping (interval: ${interval} ms)`);
await this.ping()
}
}, interval);
}
/**
* Returns the underlying socket as a readable stream.
*
* @returns The connection socket
*/
getReadStream() {
return this.connection.socket;
}
/**
* Destroys the stream and cleans up resources.
* Stops heartbeat and destroys the connection.
*/
destroy() {
if (this.heartbeatIntervalHandler)
clearInterval(this.heartbeatIntervalHandler);
return this.connection._destroy();
}
};
/**
* Creates a new RawClient instance.
*
* @param options - Client configuration
* @returns RawClient instance
*/
export function getRawClient(options: ClientConfig): RawClient {
return new CommandResponseStream(options);
}