blob: 71e0fc35bd73c48d12c31fea7eba745f953d357d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { EventEmitter } from 'node:events';
import type { Socket } from 'node:net';
import { createConnection } from 'node:net';
import { connect as TLSConnect } from 'node:tls';
import type { ClientConfig, TlsOption, TcpOption, ReconnectOption } from "./client.type.js"
import { serializeCommand } from './client.utils.js';
import { debug } from './client.debug.js';
/**
* Creates a TCP socket connection.
*
* @param options - TCP connection options
* @returns TCP socket
*/
const createTcpSocket = (options: TcpOption): Socket => {
return createConnection(options);
};
/**
* Creates a TLS socket connection.
*
* @param options - TLS connection options including port
* @returns TLS socket
*/
const createTlsSocket = ({ port, ...options }: TlsOption): Socket => {
const socket = TLSConnect(port, options);
return socket;
};
/**
* Creates a socket based on the transport type in the configuration.
*
* @param config - Client configuration with transport type
* @returns Socket for the specified transport
*/
const getTransport = (config: ClientConfig): Socket => {
const { transport, options } = config;
switch (transport) {
case 'TLS': return createTlsSocket(options);
case 'TCP':
default:
return createTcpSocket(options);
}
};
/**
* Default reconnection settings.
* Attempts reconnection every 5 seconds, up to 12 times.
*/
const DefaultReconnectOption: ReconnectOption = {
enabled: true,
interval: 5 * 1000,
maxRetries: 12
}
/**
* Recreates a socket after a delay.
* Used for reconnection attempts.
*
* @param option - Client configuration
* @param timer - Delay in milliseconds before recreating
* @returns Promise resolving to a new socket
*/
function recreate(option: ClientConfig, timer = 1000): Promise<Socket> {
return new Promise((resolve) => {
setTimeout(() => {
resolve(getTransport(option));
}, timer);
});
}
/** Socket error with optional error code */
type SocketError = Error & { code?: string };
/**
* Manages the low-level TCP/TLS connection to the Iggy server.
* Handles connection lifecycle, reconnection, and data buffering.
*/
export class IggyConnection extends EventEmitter {
/** Client configuration */
public config: ClientConfig
/** Underlying socket connection */
public socket: Socket;
/** Whether the connection is established */
public connected: boolean;
/** Whether a connection attempt is in progress */
public connecting: boolean;
/** Whether the connection is being intentionally closed */
public ending: boolean;
/** Whether waiting for more data to complete a response */
private waitingResponseEnd: boolean;
/** Reconnection configuration */
private reconnectOption: ReconnectOption;
/** Number of reconnection attempts made */
private reconnectCount: number;
/** Buffer for incomplete response data */
private readBuffers: Buffer;
/**
* Creates a new IggyConnection.
*
* @param config - Client configuration
*/
constructor(config: ClientConfig) {
super();
this.config = config;
this.socket = getTransport(config);
this.connected = false;
this.connecting = false;
this.ending = false;
this.waitingResponseEnd = false;
this.reconnectOption = { ...DefaultReconnectOption, ...config.reconnect };
this.reconnectCount = 0;
this.readBuffers = Buffer.allocUnsafe(0);
}
/**
* Establishes the connection to the server.
* Sets up event handlers for data, errors, and disconnection.
*
* @returns Promise that resolves when connected
*/
connect() {
this.connecting = true;
this.socket.on('data', this._onData.bind(this));
this.socket.on('error', async (err: SocketError) => {
debug('socket/error event', err, err.code, this.ending);
// errors about disconnections should be ignored during disconnect
if (this.ending && (err?.code === 'ECONNRESET' || err?.code === 'EPIPE'))
return
this.reconnect(err);
});
this.socket.once('end', async (hadError?: boolean) => {
debug('socket/close#END event', hadError);
this.connected = false;
this.emit('disconnected', hadError);
this.reconnect();
});
return new Promise((resolve /**, reject*/) => {
this.socket.once('connect', () => {
debug('socket/connect event');
this.connected = true;
this.connecting = false;
this.reconnectCount = 0;
this.emit('connect');
resolve(this);
});
});
}
/**
* Attempts to reconnect to the server.
* Respects maxRetries limit and emits error when exceeded.
*
* @param err - Optional error that triggered the reconnection
*/
async reconnect(err?: Error) {
const { enabled, interval, maxRetries } = this.reconnectOption
debug(
'reconnect# event/reconnect?', {
reconnect: { enabled, interval, maxRetries },
count: this.reconnectCount,
lastError: err
}
);
if (!enabled || this.reconnectCount > maxRetries) {
debug(`reconnect reached maxRetries of ${maxRetries}`, err);
return this.emit(
'error',
new Error(
`reconnect maxRetries exceeded (count: ${this.reconnectCount})`,
{ cause: err }
));
}
/** recreate socket */
this.connecting = true;
this.reconnectCount += 1;
this.socket = await recreate(this.config, interval);
this.connect();
}
/**
* Destroys the connection and marks it as ending.
*/
_destroy() {
this.ending = true;
this.socket.destroy();
}
/**
* Clears the response buffer and resets the waiting state.
*/
_endResponseWait() {
this.readBuffers = Buffer.allocUnsafe(0);
this.waitingResponseEnd = false;
}
/**
* Handles incoming data from the socket.
* Buffers incomplete responses and emits complete ones.
*
* @param data - Incoming data buffer
*/
_onData(data: Buffer) {
debug(
'ONDATA',
typeof data,
Buffer.isBuffer(data),
data?.length,
this.waitingResponseEnd
);
// Append new data to any buffered data
if (this.waitingResponseEnd && this.readBuffers.length > 0) {
data = Buffer.concat([this.readBuffers, data]);
}
// Keep processing while we have enough data
let offset = 0;
while (offset < data.length) {
const remaining = data.length - offset;
// Need at least 8 bytes for the header (4 bytes status + 4 bytes length)
if (remaining < 8) {
// Buffer the incomplete header and wait for more data
this.waitingResponseEnd = true;
this.readBuffers = data.subarray(offset);
return;
}
// Read the header
const responseSize = data.readUInt32LE(offset + 4);
const totalSize = 8 + responseSize;
// Check if we have the complete response (header + payload)
if (remaining < totalSize) {
// Buffer the incomplete response and wait for more data
this.waitingResponseEnd = true;
this.readBuffers = data.subarray(offset);
return;
}
// We have a complete response, extract it and emit
const response = data.subarray(offset, offset + totalSize);
this.emit('response', response);
// Move to the next response
offset += totalSize;
}
// All data processed, reset buffers
this._endResponseWait();
}
/**
* Writes a command to the socket.
*
* @param command - Command code
* @param payload - Command payload
* @returns True if the write was successful
*/
writeCommand(command: number, payload: Buffer): boolean {
const cmd = serializeCommand(command, payload);
return this.socket.write(cmd);
}
}