| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| |
| import { EventEmitter } from 'node:events'; |
| import type { Socket } from 'node:net'; |
| import { createConnection } from 'node:net'; |
| import { connect as TLSConnect } from 'node:tls'; |
| import type { ClientConfig, TlsOption, TcpOption, ReconnectOption } from "./client.type.js" |
| import { serializeCommand } from './client.utils.js'; |
| import { debug } from './client.debug.js'; |
| |
| |
| /** |
| * Creates a TCP socket connection. |
| * |
| * @param options - TCP connection options |
| * @returns TCP socket |
| */ |
| const createTcpSocket = (options: TcpOption): Socket => { |
| return createConnection(options); |
| }; |
| |
| /** |
| * Creates a TLS socket connection. |
| * |
| * @param options - TLS connection options including port |
| * @returns TLS socket |
| */ |
| const createTlsSocket = ({ port, ...options }: TlsOption): Socket => { |
| const socket = TLSConnect(port, options); |
| return socket; |
| }; |
| |
| /** |
| * Creates a socket based on the transport type in the configuration. |
| * |
| * @param config - Client configuration with transport type |
| * @returns Socket for the specified transport |
| */ |
| const getTransport = (config: ClientConfig): Socket => { |
| const { transport, options } = config; |
| switch (transport) { |
| case 'TLS': return createTlsSocket(options); |
| case 'TCP': |
| default: |
| return createTcpSocket(options); |
| } |
| }; |
| |
| /** |
| * Default reconnection settings. |
| * Attempts reconnection every 5 seconds, up to 12 times. |
| */ |
| const DefaultReconnectOption: ReconnectOption = { |
| enabled: true, |
| interval: 5 * 1000, |
| maxRetries: 12 |
| } |
| |
| /** |
| * Recreates a socket after a delay. |
| * Used for reconnection attempts. |
| * |
| * @param option - Client configuration |
| * @param timer - Delay in milliseconds before recreating |
| * @returns Promise resolving to a new socket |
| */ |
| function recreate(option: ClientConfig, timer = 1000): Promise<Socket> { |
| return new Promise((resolve) => { |
| setTimeout(() => { |
| resolve(getTransport(option)); |
| }, timer); |
| }); |
| } |
| |
| /** Socket error with optional error code */ |
| type SocketError = Error & { code?: string }; |
| |
| /** |
| * Manages the low-level TCP/TLS connection to the Iggy server. |
| * Handles connection lifecycle, reconnection, and data buffering. |
| */ |
| export class IggyConnection extends EventEmitter { |
| /** Client configuration */ |
| public config: ClientConfig |
| /** Underlying socket connection */ |
| public socket: Socket; |
| |
| /** Whether the connection is established */ |
| public connected: boolean; |
| /** Whether a connection attempt is in progress */ |
| public connecting: boolean; |
| /** Whether the connection is being intentionally closed */ |
| public ending: boolean; |
| /** Whether waiting for more data to complete a response */ |
| private waitingResponseEnd: boolean; |
| /** Reconnection configuration */ |
| private reconnectOption: ReconnectOption; |
| /** Number of reconnection attempts made */ |
| private reconnectCount: number; |
| |
| /** Buffer for incomplete response data */ |
| private readBuffers: Buffer; |
| |
| /** |
| * Creates a new IggyConnection. |
| * |
| * @param config - Client configuration |
| */ |
| constructor(config: ClientConfig) { |
| super(); |
| this.config = config; |
| this.socket = getTransport(config); |
| this.connected = false; |
| this.connecting = false; |
| this.ending = false; |
| this.waitingResponseEnd = false; |
| this.reconnectOption = { ...DefaultReconnectOption, ...config.reconnect }; |
| this.reconnectCount = 0; |
| this.readBuffers = Buffer.allocUnsafe(0); |
| } |
| |
| /** |
| * Establishes the connection to the server. |
| * Sets up event handlers for data, errors, and disconnection. |
| * |
| * @returns Promise that resolves when connected |
| */ |
| connect() { |
| this.connecting = true; |
| |
| this.socket.on('data', this._onData.bind(this)); |
| |
| this.socket.on('error', async (err: SocketError) => { |
| debug('socket/error event', err, err.code, this.ending); |
| // errors about disconnections should be ignored during disconnect |
| if (this.ending && (err?.code === 'ECONNRESET' || err?.code === 'EPIPE')) |
| return |
| |
| this.reconnect(err); |
| }); |
| |
| this.socket.once('end', async (hadError?: boolean) => { |
| debug('socket/close#END event', hadError); |
| this.connected = false; |
| this.emit('disconnected', hadError); |
| this.reconnect(); |
| }); |
| |
| |
| return new Promise((resolve /**, reject*/) => { |
| this.socket.once('connect', () => { |
| debug('socket/connect event'); |
| this.connected = true; |
| this.connecting = false; |
| this.reconnectCount = 0; |
| this.emit('connect'); |
| resolve(this); |
| }); |
| }); |
| } |
| |
| /** |
| * Attempts to reconnect to the server. |
| * Respects maxRetries limit and emits error when exceeded. |
| * |
| * @param err - Optional error that triggered the reconnection |
| */ |
| async reconnect(err?: Error) { |
| const { enabled, interval, maxRetries } = this.reconnectOption |
| debug( |
| 'reconnect# event/reconnect?', { |
| reconnect: { enabled, interval, maxRetries }, |
| count: this.reconnectCount, |
| lastError: err |
| } |
| ); |
| |
| if (!enabled || this.reconnectCount > maxRetries) { |
| debug(`reconnect reached maxRetries of ${maxRetries}`, err); |
| return this.emit( |
| 'error', |
| new Error( |
| `reconnect maxRetries exceeded (count: ${this.reconnectCount})`, |
| { cause: err } |
| )); |
| } |
| |
| /** recreate socket */ |
| this.connecting = true; |
| this.reconnectCount += 1; |
| this.socket = await recreate(this.config, interval); |
| this.connect(); |
| } |
| |
| /** |
| * Destroys the connection and marks it as ending. |
| */ |
| _destroy() { |
| this.ending = true; |
| this.socket.destroy(); |
| } |
| |
| /** |
| * Clears the response buffer and resets the waiting state. |
| */ |
| _endResponseWait() { |
| this.readBuffers = Buffer.allocUnsafe(0); |
| this.waitingResponseEnd = false; |
| } |
| |
| /** |
| * Handles incoming data from the socket. |
| * Buffers incomplete responses and emits complete ones. |
| * |
| * @param data - Incoming data buffer |
| */ |
| _onData(data: Buffer) { |
| debug( |
| 'ONDATA', |
| typeof data, |
| Buffer.isBuffer(data), |
| data?.length, |
| this.waitingResponseEnd |
| ); |
| |
| // Append new data to any buffered data |
| if (this.waitingResponseEnd && this.readBuffers.length > 0) { |
| data = Buffer.concat([this.readBuffers, data]); |
| } |
| |
| // Keep processing while we have enough data |
| let offset = 0; |
| |
| while (offset < data.length) { |
| const remaining = data.length - offset; |
| |
| // Need at least 8 bytes for the header (4 bytes status + 4 bytes length) |
| if (remaining < 8) { |
| // Buffer the incomplete header and wait for more data |
| this.waitingResponseEnd = true; |
| this.readBuffers = data.subarray(offset); |
| return; |
| } |
| |
| // Read the header |
| const responseSize = data.readUInt32LE(offset + 4); |
| const totalSize = 8 + responseSize; |
| |
| // Check if we have the complete response (header + payload) |
| if (remaining < totalSize) { |
| // Buffer the incomplete response and wait for more data |
| this.waitingResponseEnd = true; |
| this.readBuffers = data.subarray(offset); |
| return; |
| } |
| |
| // We have a complete response, extract it and emit |
| const response = data.subarray(offset, offset + totalSize); |
| this.emit('response', response); |
| |
| // Move to the next response |
| offset += totalSize; |
| } |
| |
| // All data processed, reset buffers |
| this._endResponseWait(); |
| } |
| |
| /** |
| * Writes a command to the socket. |
| * |
| * @param command - Command code |
| * @param payload - Command payload |
| * @returns True if the write was successful |
| */ |
| writeCommand(command: number, payload: Buffer): boolean { |
| const cmd = serializeCommand(command, payload); |
| return this.socket.write(cmd); |
| } |
| } |