blob: 58bdffd2c9b8b80829631c191ee2c068713b2ebc [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { MessageView } from '../message';
import { ConsumeService } from './ConsumeService';
import { MessageListener } from './MessageListener';
import type { ProcessQueue } from './ProcessQueue';
import { ILogger, getDefaultLogger } from '../client/Logger';
export class FifoConsumeService extends ConsumeService {
readonly #enableAccelerator: boolean;
readonly #groupProcessing = new Map<string /* messageGroup */, Promise<void>>();
readonly #logger: ILogger;
constructor(clientId: string, messageListener: MessageListener, enableAccelerator?: boolean) {
super(clientId, messageListener);
this.#enableAccelerator = enableAccelerator ?? false;
this.#logger = getDefaultLogger();
}
consume(pq: ProcessQueue, messageViews: MessageView[]): void {
// Use iterative consumption when accelerator is disabled or only one message
if (!this.#enableAccelerator || messageViews.length <= 1) {
this.#consumeIteratively(pq, messageViews, 0);
return;
}
this.#consumeWithAccelerator(pq, messageViews);
}
/**
* FIFO consume accelerator mode:
* - Messages with the same messageGroup are consumed sequentially
* - Messages with different messageGroups are consumed in parallel
* - Messages without messageGroup are consumed in parallel
*/
#consumeWithAccelerator(pq: ProcessQueue, messageViews: MessageView[]): void {
// Group messages by messageGroup
const groupedMessages = new Map<string, MessageView[]>();
const ungroupedMessages: MessageView[] = [];
for (const messageView of messageViews) {
if (messageView.corrupted) {
pq.discardFifoMessage(messageView);
continue;
}
const messageGroup = messageView.messageGroup || '';
if (messageGroup) {
const group = groupedMessages.get(messageGroup) || [];
group.push(messageView);
groupedMessages.set(messageGroup, group);
} else {
ungroupedMessages.push(messageView);
}
}
// Log parallel consumption info
const groupCount = groupedMessages.size + (ungroupedMessages.length > 0 ? 1 : 0);
this.#logger.debug?.('FifoConsumeService parallel consume, messageViewsNum=%d, groupNum=%d',
messageViews.length, groupCount);
// Process grouped messages (each group sequentially, groups in parallel)
for (const [ , messages ] of groupedMessages.entries()) {
this.#processMessageGroup(pq, messages);
}
// Process ungrouped messages in parallel
for (const messageView of ungroupedMessages) {
this.consumeMessage(messageView)
.then(result => pq.eraseFifoMessage(messageView, result))
.catch(() => {
// Error already logged, continue with next message
});
}
}
/**
* Process messages within the same group sequentially
*/
async #processMessageGroup(pq: ProcessQueue, messages: MessageView[]): Promise<void> {
// Check if there's already processing happening for this group
const messageGroup = messages[0]?.messageGroup || 'NO_GROUP';
const existingPromise = this.#groupProcessing.get(messageGroup);
const processTask = (async () => {
// Wait for previous processing to complete if any
if (existingPromise) {
await existingPromise.catch(() => {
// Ignore previous errors, continue processing
});
}
// Process messages in sequence
for (const messageView of messages) {
try {
const result = await this.consumeMessage(messageView);
await pq.eraseFifoMessage(messageView, result);
} catch (error) {
this.#logger.error('Failed to process FIFO message, messageGroup=%s, messageId=%s, error=%s',
messageGroup, messageView.messageId, error);
// Continue with next message even if current fails
}
}
})();
// Store the promise for this group
this.#groupProcessing.set(messageGroup, processTask);
// Clean up when done
processTask.finally(() => {
this.#groupProcessing.delete(messageGroup);
});
}
#consumeIteratively(pq: ProcessQueue, messageViews: MessageView[], index: number): void {
if (index >= messageViews.length) {
return;
}
const messageView = messageViews[index];
if (messageView.corrupted) {
pq.discardFifoMessage(messageView);
this.#consumeIteratively(pq, messageViews, index + 1);
return;
}
this.consumeMessage(messageView)
.then(result => pq.eraseFifoMessage(messageView, result))
.then(() => this.#consumeIteratively(pq, messageViews, index + 1))
.catch(() => this.#consumeIteratively(pq, messageViews, index + 1));
}
}