blob: 34fce59b2ad36043ec3fee2e9968407a3bd4e275 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { type Id } from "../identifier.utils.js";
import { type ValueOf, reverseRecord } from "../../type.utils.js";
import { serializeGetOffset, type Consumer } from "../offset/offset.utils.js";
import { deserializeHeaders, type ParsedHeaderEntry } from "./header.utils.js";
import { Transform, type TransformCallback } from "node:stream";
import {
deserializeIggyMessageHeaders,
IGGY_MESSAGE_HEADER_SIZE,
type IggyMessageHeader,
} from "./iggy-header.utils.js";
/**
* Enumeration of message polling strategies.
*/
export const PollingStrategyKind = {
/** Poll from a specific offset */
Offset: 1,
/** Poll from a specific timestamp */
Timestamp: 2,
/** Poll from the first message */
First: 3,
/** Poll from the last message */
Last: 4,
/** Poll the next unconsumed message */
Next: 5,
} as const;
/** Type alias for the PollingStrategyKind object */
export type PollingStrategyKind = typeof PollingStrategyKind;
/** String literal type of polling strategy names */
export type PollingStrategyKindId = keyof PollingStrategyKind;
/** Numeric values of polling strategies */
export type PollingStrategyKindValue = ValueOf<PollingStrategyKind>;
/** Polling from a specific offset */
export type OffsetPollingStrategy = {
kind: PollingStrategyKind["Offset"];
/** Offset to start polling from */
value: bigint;
};
/** Polling from a specific timestamp */
export type TimestampPollingStrategy = {
kind: PollingStrategyKind["Timestamp"];
/** Timestamp in microseconds */
value: bigint;
};
/** Polling from the first message */
export type FirstPollingStrategy = {
kind: PollingStrategyKind["First"];
value: 0n;
};
/** Polling from the last message */
export type LastPollingStrategy = {
kind: PollingStrategyKind["Last"];
value: 0n;
};
/** Polling the next unconsumed message */
export type NextPollingStrategy = {
kind: PollingStrategyKind["Next"];
value: 0n;
};
/** Union of all polling strategy types */
export type PollingStrategy =
| OffsetPollingStrategy
| TimestampPollingStrategy
| FirstPollingStrategy
| LastPollingStrategy
| NextPollingStrategy;
/** Next polling strategy constant */
const Next: NextPollingStrategy = {
kind: PollingStrategyKind.Next,
value: 0n,
};
/** First polling strategy constant */
const First: FirstPollingStrategy = {
kind: PollingStrategyKind.First,
value: 0n,
};
/** Last polling strategy constant */
const Last: LastPollingStrategy = {
kind: PollingStrategyKind.Last,
value: 0n,
};
/**
* Creates an offset polling strategy.
*
* @param n - Offset to start from
* @returns Offset polling strategy
*/
const Offset = (n: bigint): OffsetPollingStrategy => ({
kind: PollingStrategyKind.Offset,
value: n,
});
/**
* Creates a timestamp polling strategy.
*
* @param n - Timestamp in microseconds
* @returns Timestamp polling strategy
*/
const Timestamp = (n: bigint): TimestampPollingStrategy => ({
kind: PollingStrategyKind.Timestamp,
value: n,
});
/**
* Factory object for creating polling strategies.
*/
export const PollingStrategy = {
Next,
First,
Last,
Offset,
Timestamp,
};
/**
* Serializes a poll messages command payload.
*
* @param streamId - Stream identifier
* @param topicId - Topic identifier
* @param consumer - Consumer configuration
* @param partitionId - Partition ID (null for all partitions)
* @param pollingStrategy - Strategy for selecting messages
* @param count - Maximum number of messages to poll
* @param autocommit - Whether to auto-commit offset after polling
* @returns Serialized command payload
*/
export const serializePollMessages = (
streamId: Id,
topicId: Id,
consumer: Consumer,
partitionId: number | null,
pollingStrategy: PollingStrategy, // default to OffsetPollingStrategy
count = 10,
autocommit = false,
) => {
const b = Buffer.allocUnsafe(14);
b.writeUInt8(pollingStrategy.kind, 0);
b.writeBigUInt64LE(pollingStrategy.value, 1);
b.writeUInt32LE(count, 9);
b.writeUInt8(!!autocommit ? 1 : 0, 13);
return Buffer.concat([
serializeGetOffset(streamId, topicId, consumer, partitionId),
b,
]);
};
/**
* Enumeration of message states.
*/
export const MessageState = {
/** Message is available for consumption */
Available: 1,
/** Message is temporarily unavailable */
Unavailable: 10,
/** Message processing failed */
Poisoned: 20,
/** Message is scheduled for deletion */
MarkedForDeletion: 30,
};
/** Type alias for the MessageState object */
type MessageState = typeof MessageState;
/** String literal type of message state names */
type MessageStateId = keyof MessageState;
/** Numeric values of message states */
type MessageStateValue = ValueOf<MessageState>;
/** Reverse mapping from numeric value to state name */
const ReverseMessageState = reverseRecord(MessageState);
/**
* Maps a numeric message state to its string identifier.
*
* @param k - Numeric state value
* @returns State identifier string
* @throws Error if the state is unknown
*/
export const mapMessageState = (k: number): MessageStateId => {
if (!ReverseMessageState[k as MessageStateValue])
throw new Error(`unknow message state: ${k}`);
return ReverseMessageState[k as MessageStateValue];
};
/**
* A polled message with headers, payload, and user headers.
*/
export type Message = {
/** Iggy message header metadata */
headers: IggyMessageHeader;
/** Message payload data */
payload: Buffer;
/** User-defined headers */
userHeaders: ParsedHeaderEntry[];
};
/**
* Response from a poll messages command.
*/
export type PollMessagesResponse = {
/** Partition the messages came from */
partitionId: number;
/** Current offset in the partition */
currentOffset: bigint;
/** Number of messages returned */
count: number;
/** Array of polled messages */
messages: Message[];
};
/**
* Deserializes an array of messages from a buffer.
*
* @param b - Buffer containing serialized messages
* @returns Array of deserialized messages
*/
export const deserializeMessages = (b: Buffer) => {
const messages: Message[] = [];
let pos = 0;
const len = b.length;
while (pos < len) {
if (pos + IGGY_MESSAGE_HEADER_SIZE > len) break;
const bHead = b.subarray(pos, pos + IGGY_MESSAGE_HEADER_SIZE);
const headers = deserializeIggyMessageHeaders(bHead);
pos += IGGY_MESSAGE_HEADER_SIZE;
const plEnd = pos + headers.payloadLength;
if (plEnd > len) break;
const payload = b.subarray(pos, plEnd);
pos += headers.payloadLength;
let userHeaders: ParsedHeaderEntry[] = [];
if (
headers.userHeadersLength > 0 &&
plEnd + headers.userHeadersLength <= len
) {
userHeaders = deserializeHeaders(
b.subarray(plEnd, plEnd + headers.userHeadersLength),
);
pos += headers.userHeadersLength;
}
messages.push({
headers,
payload,
userHeaders,
});
}
return messages;
};
/**
* Deserializes a poll messages response from a buffer.
*
* @param r - Response buffer
* @param pos - Starting position
* @returns Parsed PollMessagesResponse
*/
export const deserializePollMessages = (r: Buffer, pos = 0) => {
const partitionId = r.readUInt32LE(pos);
const currentOffset = r.readBigUInt64LE(pos + 4);
const count = r.readUInt32LE(pos + 12);
const messages = deserializeMessages(r.subarray(16));
return {
partitionId,
currentOffset,
count,
messages,
};
};
/**
* Creates a Transform stream for deserializing poll messages responses.
*
* @returns Transform stream that outputs PollMessagesResponse objects
*/
export const deserializePollMessagesTransform = () =>
new Transform({
objectMode: true,
transform(chunk: Buffer, encoding: BufferEncoding, cb: TransformCallback) {
try {
return cb(null, deserializePollMessages(chunk));
} catch (err: unknown) {
cb(
new Error("deserializePollMessage::transform error", { cause: err }),
null,
);
}
},
});