blob: ecbfbbb05e1000b999e8177789bb82cbe49bff3b [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Message, Status } from '../../proto/apache/rocketmq/v2/definition_pb';
import {
AckMessageRequest,
ChangeInvisibleDurationRequest,
ReceiveMessageRequest, ReceiveMessageResponse,
} from '../../proto/apache/rocketmq/v2/service_pb';
import { ClientType } from '../../proto/apache/rocketmq/v2/definition_pb';
import { MessageView } from '../message';
import { MessageQueue } from '../route';
import { StatusChecker } from '../exception';
import { BaseClient, BaseClientOptions } from '../client';
import { createDuration, createResource } from '../util';
import { FilterExpression } from './FilterExpression';
export interface ConsumerOptions extends BaseClientOptions {
consumerGroup: string;
}
export abstract class Consumer extends BaseClient {
protected readonly consumerGroup: string;
constructor(options: ConsumerOptions) {
super(options);
this.consumerGroup = options.consumerGroup;
}
protected wrapReceiveMessageRequest(batchSize: number, mq: MessageQueue,
filterExpression: FilterExpression, invisibleDuration: number, longPollingTimeout: number) {
return new ReceiveMessageRequest()
.setGroup(createResource(this.consumerGroup))
.setMessageQueue(mq.toProtobuf())
.setFilterExpression(filterExpression.toProtobuf())
.setLongPollingTimeout(createDuration(longPollingTimeout))
.setBatchSize(batchSize)
.setAutoRenew(false)
.setInvisibleDuration(createDuration(invisibleDuration));
}
protected async receiveMessage(request: ReceiveMessageRequest, mq: MessageQueue, awaitDuration: number) {
const endpoints = mq.broker.endpoints;
const timeout = this.requestTimeout + awaitDuration;
let status: Status.AsObject | undefined;
try {
this.logger.debug?.('Receiving messages from broker, topic=%s, endpoints=%s, batchSize=%d, clientId=%s',
request.getMessageQueue()?.getTopic()?.getName(), endpoints, request.getBatchSize(), (this as any).clientId);
const responses = await this.rpcClientManager.receiveMessage(endpoints, request, timeout);
const messageList: Message[] = [];
let transportDeliveryTimestamp: Date | undefined;
for (const response of responses) {
switch (response.getContentCase()) {
case ReceiveMessageResponse.ContentCase.STATUS:
status = response.getStatus()?.toObject();
break;
case ReceiveMessageResponse.ContentCase.MESSAGE:
messageList.push(response.getMessage()!);
break;
case ReceiveMessageResponse.ContentCase.DELIVERY_TIMESTAMP:
transportDeliveryTimestamp = response.getDeliveryTimestamp()?.toDate();
break;
default:
// this.logger.warn("[Bug] Not recognized content for receive message response, mq={}, " +
// "clientId={}, response={}", mq, clientId, response);
}
}
StatusChecker.check(status);
if (messageList.length > 0) {
this.logger.debug?.('Received %d messages successfully, topic=%s, endpoints=%s, clientId=%s',
messageList.length, request.getMessageQueue()?.getTopic()?.getName(), endpoints, (this as any).clientId);
}
const messages = messageList.map(message => new MessageView(message, mq, transportDeliveryTimestamp));
return messages;
} catch (err) {
this.logger.error('Failed to receive messages, topic=%s, endpoints=%s, clientId=%s, error=%s',
request.getMessageQueue()?.getTopic()?.getName(), endpoints, (this as any).clientId, err);
throw err;
}
}
protected async ackMessage(messageView: MessageView) {
const endpoints = messageView.endpoints;
const request = new AckMessageRequest()
.setGroup(createResource(this.consumerGroup))
.setTopic(createResource(messageView.topic));
request.addEntries()
.setMessageId(messageView.messageId)
.setReceiptHandle(messageView.receiptHandle);
const res = await this.rpcClientManager.ackMessage(endpoints, request, this.requestTimeout);
// FIXME: handle fail ack
const response = res.toObject();
StatusChecker.check(response.status);
return response.entriesList;
}
protected async invisibleDuration(messageView: MessageView, invisibleDuration: number) {
const request = new ChangeInvisibleDurationRequest()
.setGroup(createResource(this.consumerGroup))
.setTopic(createResource(messageView.topic))
.setReceiptHandle(messageView.receiptHandle)
.setInvisibleDuration(createDuration(invisibleDuration))
.setMessageId(messageView.messageId);
const res = await this.rpcClientManager.changeInvisibleDuration(messageView.endpoints, request, this.requestTimeout);
const response = res.toObject();
StatusChecker.check(response.status);
return response.receiptHandle;
}
/**
* Expose public methods for ProcessQueue to access RPC operations
*/
async ackMessageViaRpc(endpoints: any, request: AckMessageRequest, timeout: number) {
const res = await this.rpcClientManager.ackMessage(endpoints, request, timeout);
return res;
}
async changeInvisibleDurationViaRpc(endpoints: any, request: ChangeInvisibleDurationRequest, timeout: number) {
const res = await this.rpcClientManager.changeInvisibleDuration(endpoints, request, timeout);
return res;
}
async forwardMessageToDeadLetterQueueViaRpc(endpoints: any, request: any, timeout: number) {
const res = await this.rpcClientManager.forwardMessageToDeadLetterQueue(endpoints, request, timeout);
return res;
}
/**
* Get the consumer group name.
*
* @return Consumer group name
*/
getConsumerGroup(): string {
return this.consumerGroup;
}
/**
* Check if this is a lite consumer.
*
* @return true if this is a LITE_PUSH_CONSUMER
*/
protected isLiteConsumer(): boolean {
const clientType = (this as any).getClientType?.();
return clientType === ClientType.LITE_PUSH_CONSUMER;
}
}