blob: f0b614d11e45f891812d5b510961068ba7631985 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import Debug from 'debug';
import { uint32ToBuf, u128ToBuf, uint8ToBuf } from '../number.utils.js';
import { serializeHeaders, type Headers } from './header.utils.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { serializePartitioning, type Partitioning } from './partitioning.utils.js';
import { parse as parseUUID } from '../uuid.utils.js';
import { serializeIggyMessageHeader } from './iggy-header.utils.js';
const debug = Debug('iggy:client');
/** Size of the message index entry in bytes (16 bytes per message) */
const INDEX_SIZE = 16;
/** Valid types for message ID: numeric, bigint, or UUID string */
export type MessageIdKind = number | bigint | string;
/**
* Message creation parameters.
*/
export type CreateMessage = {
/** Optional message ID (auto-generated if not provided) */
id?: MessageIdKind,
/** Optional user-defined headers */
headers?: Headers,
/** Message payload as string or Buffer */
payload: string | Buffer
};
/**
* Type guard to check if a value is a valid message ID.
*
* @param x - Value to check
* @returns True if the value is a valid MessageIdKind
*/
export const isValidMessageId = (x?: unknown): x is MessageIdKind =>
x === undefined ||
'string' === typeof x ||
'bigint' === typeof x ||
'number' === typeof x;
/**
* Serializes a message ID to a 16-byte buffer.
* Supports undefined (zero), numeric, bigint, and UUID string formats.
*
* @param id - Message ID to serialize
* @returns 16-byte buffer containing the serialized ID
* @throws Error if the ID format is invalid
*/
export const serializeMessageId = (id?: unknown) => {
if(!isValidMessageId(id))
throw new Error(`invalid message id: '${id}' (use uuid string | number | bigint >= 0)`)
if(id === undefined)
return Buffer.alloc(16, 0); // 0u128
if ('bigint' === typeof id || 'number' === typeof id) {
if (id < 0)
throw new Error(`invalid message id: '${id}' (numeric id must be >= 0)`)
const idValue = 'number' === typeof id ? BigInt(id) : id;
return u128ToBuf(idValue);
}
try {
const uuid = parseUUID(id);
return Buffer.from(uuid.toHex(), 'hex');
} catch (err) {
throw new Error(
`invalid message id: '${id}' (use uuid string | number | bigint >= 0)`,
{ cause: err }
)
}
}
/**
* Serializes a single message to wire format.
* Format: [iggy_header][payload][user_headers]
*
* @param msg - Message to serialize
* @returns Serialized message buffer
*/
export const serializeMessage = (msg: CreateMessage) => {
const { id, headers, payload } = msg;
const bId = serializeMessageId(id);
const bUserHeaders = serializeHeaders(headers);
const bPayload = 'string' === typeof payload ? Buffer.from(payload) : payload
const bIggyMessageHeader = serializeIggyMessageHeader(bId, bPayload, bUserHeaders);
const r = Buffer.concat([
bIggyMessageHeader,
bPayload,
bUserHeaders
]);
debug(
'id', bId.length, bId.toString('hex'),
'iggyHeaders', bIggyMessageHeader.length, bIggyMessageHeader.toString('hex'),
'userHeaders', bUserHeaders.length, bUserHeaders.toString('hex'),
'payload', bPayload.length, bPayload.toString('hex'),
'full len', r.length //, r.toString('hex')
);
return r;
};
/**
* Serializes multiple messages to an array of buffers.
*
* @param messages - Array of messages to serialize
* @returns Array of serialized message buffers
*/
export const serializeMessages = (messages: CreateMessage[]) =>
messages.map(c => serializeMessage(c));
/**
* Creates an index buffer for a batch of messages.
* Each index entry is 16 bytes tracking message positions.
*
* @param messages - Array of serialized message buffers
* @returns Index buffer
*/
export const createMessagesIndex = (messages: Buffer[]) => {
const bIndex = Buffer.allocUnsafe(messages.length * INDEX_SIZE);
let currentIndex = 0;
let msgsSize = 0;
messages.forEach(msg => {
msgsSize += msg.length;
bIndex.writeBigUInt64LE(0n, currentIndex)
bIndex.writeUInt32LE(msgsSize, currentIndex + 4)
bIndex.writeBigUInt64LE(0n, currentIndex + 8)
currentIndex += INDEX_SIZE;
});
return bIndex;
}
/**
* Serializes a send messages command payload.
* Includes stream/topic identifiers, partitioning, and all messages with index.
*
* @param streamId - Stream identifier
* @param topicId - Topic identifier
* @param messages - Array of messages to send
* @param partitioning - Optional partitioning strategy
* @returns Serialized command payload
*/
export const serializeSendMessages = (
streamId: Id,
topicId: Id,
messages: CreateMessage[],
partitioning?: Partitioning,
) => {
const streamIdentifier = serializeIdentifier(streamId);
const topicIdentifier = serializeIdentifier(topicId);
const bPartitioning = serializePartitioning(partitioning);
const bMessagesCount = uint32ToBuf(messages.length);
const bMetadataLen = uint32ToBuf(
streamIdentifier.length + topicIdentifier.length +
bPartitioning.length + bMessagesCount.length
);
const bMessagesArray = serializeMessages(messages);
const bMessageIndex = createMessagesIndex(bMessagesArray);
return Buffer.concat([
bMetadataLen,
streamIdentifier,
topicIdentifier,
bPartitioning,
bMessagesCount,
bMessageIndex,
...bMessagesArray
]);
};
/**
* Serializes a flush unsaved buffers command payload.
*
* @param streamId - Stream identifier
* @param topicId - Topic identifier
* @param partitionId - Partition ID to flush
* @param fsync - Whether to force sync to disk
* @returns Serialized command payload
*/
export const serializeFlushUnsavedBuffers = (
streamId: Id,
topicId: Id,
partitionId: number,
fsync = false
) => {
const streamIdentifier = serializeIdentifier(streamId);
const topicIdentifier = serializeIdentifier(topicId);
const bPartitionId = uint32ToBuf(partitionId);
const bFSync = uint8ToBuf(fsync ? 1 : 0);
return Buffer.concat([
streamIdentifier,
topicIdentifier,
bPartitionId,
bFSync
]);
};