blob: 9def6d810bcabd69eac3466a00f8b0eb0fb51aab [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Readable } from "node:stream";
import type { ClientConfig } from "../client/client.type.js";
import type { Id } from '../wire/identifier.utils.js';
import { getClient } from "../client/client.js";
import { type PollMessages } from "../wire/message/poll-messages.command.js";
import {
type PollingStrategy,
type CommandAPI,
Consumer
} from "../wire/index.js";
const wait = (interval: number, cb?: () => void): Promise<void> =>
new Promise((resolve) => {
setTimeout(() => resolve(cb ? cb() : undefined), interval)
});
async function* genEagerUntilPoll(
c: CommandAPI,
poll: PollMessages,
interval = 1000,
endOnLastOffset = false
) {
const state: Map<string, number> = new Map();
while (true) {
const r = await c.message.poll(poll);
yield r;
state.set(`${r.partitionId}`, r.count);
if (Array.from(state).every(([, last]) => last === 0)) {
if( endOnLastOffset )
return;
else
await wait(interval);
}
}
};
export type ConsumerStreamRequest = {
streamId: Id,
topicId: Id,
pollingStrategy: PollingStrategy,
count: number,
interval?: number,
autocommit?: boolean
endOnLastOffset?: boolean
}
export type SingleConsumerStreamRequest = ConsumerStreamRequest & {
partitionId: number,
};
export type GroupConsumerStreamRequest = ConsumerStreamRequest & { groupName: string };
export const singleConsumerStream = (config: ClientConfig) => async (
{
streamId,
topicId,
partitionId,
pollingStrategy,
count,
interval = 1000,
autocommit = true,
endOnLastOffset = false
}: SingleConsumerStreamRequest
): Promise<Readable> => {
const c = await getClient(config);
const poll = {
streamId,
topicId,
partitionId,
consumer: Consumer.Single,
pollingStrategy,
count,
autocommit
};
const ps = Readable.from(
genEagerUntilPoll(c, poll, interval, endOnLastOffset),
{ objectMode: true }
);
return ps;
};
export const groupConsumerStream = (config: ClientConfig) =>
async function groupConsumerStream({
groupName,
streamId,
topicId,
pollingStrategy,
count,
interval = 1000,
autocommit = true,
endOnLastOffset = false
}: GroupConsumerStreamRequest): Promise<Readable> {
const s = await getClient(config);
await s.group.ensureAndJoin(streamId, topicId, groupName);
const poll = {
streamId,
topicId,
consumer: Consumer.Group(groupName),
partitionId: null,
pollingStrategy,
count,
autocommit
};
const ps = Readable.from(
genEagerUntilPoll(s, poll, interval, endOnLastOffset),
{ objectMode: true }
);
return ps;
};