blob: 2cbc6d021ae030c65c173e3e88244d74591af6a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.java.impl.consumer;
import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.Status;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.VerifyMessageCommand;
import apache.rocketmq.v2.VerifyMessageResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.exception.StatusChecker;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptorContext;
import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
import org.apache.rocketmq.client.java.impl.Settings;
import org.apache.rocketmq.client.java.message.GeneralMessage;
import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.metrics.GaugeObserver;
import org.apache.rocketmq.client.java.misc.ExcludeFromJacocoGeneratedReport;
import org.apache.rocketmq.client.java.misc.ExecutorServices;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteData;
import org.apache.rocketmq.client.java.rpc.RpcFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Default implementation of {@link PushConsumer}
*
* <p>It is worth noting that in the implementation of push consumer, the message is not actively pushed by the server
* to the client, but is obtained by the client actively going to the server.
*
* @see PushConsumer
*/
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
private static final Logger log = LoggerFactory.getLogger(PushConsumerImpl.class);
final AtomicLong consumptionOkQuantity;
final AtomicLong consumptionErrorQuantity;
private final ClientConfiguration clientConfiguration;
private final PushSubscriptionSettings pushSubscriptionSettings;
private final String consumerGroup;
private final Map<String /* topic */, FilterExpression> subscriptionExpressions;
private final ConcurrentMap<String /* topic */, Assignments> cacheAssignments;
private final MessageListener messageListener;
private final int maxCacheMessageCount;
private final int maxCacheMessageSizeInBytes;
/**
* Indicates the times of message reception.
*/
private final AtomicLong receptionTimes;
/**
* Indicates the quantity of received messages.
*/
private final AtomicLong receivedMessagesQuantity;
private final ThreadPoolExecutor consumptionExecutor;
private final ConcurrentMap<MessageQueueImpl, ProcessQueue> processQueueTable;
private ConsumeService consumeService;
private volatile ScheduledFuture<?> scanAssignmentsFuture;
/**
* The caller is supposed to have validated the arguments and handled throwing exception or
* logging warnings already, so we avoid repeating args check here.
*/
public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup,
Map<String, FilterExpression> subscriptionExpressions, MessageListener messageListener,
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) {
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
this.clientConfiguration = clientConfiguration;
Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
this.pushSubscriptionSettings = new PushSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
endpoints, groupResource, clientConfiguration.getRequestTimeout(), subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.subscriptionExpressions = subscriptionExpressions;
this.cacheAssignments = new ConcurrentHashMap<>();
this.messageListener = messageListener;
this.maxCacheMessageCount = maxCacheMessageCount;
this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
this.receptionTimes = new AtomicLong(0);
this.receivedMessagesQuantity = new AtomicLong(0);
this.consumptionOkQuantity = new AtomicLong(0);
this.consumptionErrorQuantity = new AtomicLong(0);
this.processQueueTable = new ConcurrentHashMap<>();
this.consumptionExecutor = new ThreadPoolExecutor(
consumptionThreadCount,
consumptionThreadCount,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryImpl("MessageConsumption", this.getClientId().getIndex()));
}
@Override
protected void startUp() throws Exception {
try {
log.info("Begin to start the rocketmq push consumer, clientId={}", clientId);
GaugeObserver gaugeObserver = new ProcessQueueGaugeObserver(processQueueTable, clientId, consumerGroup);
this.clientMeterManager.setGaugeObserver(gaugeObserver);
super.startUp();
final ScheduledExecutorService scheduler = this.getClientManager().getScheduler();
this.consumeService = createConsumeService();
// Scan assignments periodically.
scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
try {
scanAssignments();
} catch (Throwable t) {
log.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);
}
}, 1, 5, TimeUnit.SECONDS);
log.info("The rocketmq push consumer starts successfully, clientId={}", clientId);
} catch (Throwable t) {
log.error("Exception raised while starting the rocketmq push consumer, clientId={}", clientId, t);
shutDown();
throw t;
}
}
@Override
protected void shutDown() throws InterruptedException {
log.info("Begin to shutdown the rocketmq push consumer, clientId={}", clientId);
if (null != scanAssignmentsFuture) {
scanAssignmentsFuture.cancel(false);
}
super.shutDown();
this.consumptionExecutor.shutdown();
ExecutorServices.awaitTerminated(consumptionExecutor);
log.info("Shutdown the rocketmq push consumer successfully, clientId={}", clientId);
}
private ConsumeService createConsumeService() {
final ScheduledExecutorService scheduler = this.getClientManager().getScheduler();
if (pushSubscriptionSettings.isFifo()) {
log.info("Create FIFO consume service, consumerGroup={}, clientId={}", consumerGroup, clientId);
return new FifoConsumeService(clientId, messageListener, consumptionExecutor, this, scheduler);
}
log.info("Create standard consume service, consumerGroup={}, clientId={}", consumerGroup, clientId);
return new StandardConsumeService(clientId, messageListener, consumptionExecutor, this, scheduler);
}
/**
* @see PushConsumer#getConsumerGroup()
*/
@Override
public String getConsumerGroup() {
return consumerGroup;
}
public PushSubscriptionSettings getPushConsumerSettings() {
return pushSubscriptionSettings;
}
/**
* @see PushConsumer#getSubscriptionExpressions()
*/
@Override
public Map<String, FilterExpression> getSubscriptionExpressions() {
return new HashMap<>(subscriptionExpressions);
}
/**
* @see PushConsumer#subscribe(String, FilterExpression)
*/
@Override
public PushConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException {
// Check consumer status.
if (!this.isRunning()) {
log.error("Unable to add subscription because push consumer is not running, state={}, clientId={}",
this.state(), clientId);
throw new IllegalStateException("Push consumer is not running now");
}
final ListenableFuture<TopicRouteData> future = getRouteData(topic);
handleClientFuture(future);
subscriptionExpressions.put(topic, filterExpression);
return this;
}
/**
* @see PushConsumer#unsubscribe(String)
*/
@Override
public PushConsumer unsubscribe(String topic) {
// Check consumer status.
if (!this.isRunning()) {
log.error("Unable to remove subscription because push consumer is not running, state={}, clientId={}",
this.state(), clientId);
throw new IllegalStateException("Push consumer is not running now");
}
subscriptionExpressions.remove(topic);
return this;
}
private ListenableFuture<Endpoints> pickEndpointsToQueryAssignments(String topic) {
final ListenableFuture<TopicRouteData> future = getRouteData(topic);
return Futures.transformAsync(future, topicRouteData -> {
Endpoints endpoints = topicRouteData.pickEndpointsToQueryAssignments();
return Futures.immediateFuture(endpoints);
}, MoreExecutors.directExecutor());
}
private QueryAssignmentRequest wrapQueryAssignmentRequest(String topic) {
apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(topic)
.build();
return QueryAssignmentRequest.newBuilder().setTopic(topicResource)
.setEndpoints(endpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
}
ListenableFuture<Assignments> queryAssignment(final String topic) {
final ListenableFuture<Endpoints> future0 = pickEndpointsToQueryAssignments(topic);
return Futures.transformAsync(future0, endpoints -> {
final QueryAssignmentRequest request = wrapQueryAssignmentRequest(topic);
final Duration requestTimeout = clientConfiguration.getRequestTimeout();
final RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse> future1 =
this.getClientManager().queryAssignment(endpoints, request, requestTimeout);
return Futures.transformAsync(future1, response -> {
final Status status = response.getStatus();
StatusChecker.check(status, future1);
final List<Assignment> assignmentList = response.getAssignmentsList().stream().map(assignment ->
new Assignment(new MessageQueueImpl(assignment.getMessageQueue()))).collect(Collectors.toList());
final Assignments assignments = new Assignments(assignmentList);
return Futures.immediateFuture(assignments);
}, MoreExecutors.directExecutor());
}, MoreExecutors.directExecutor());
}
/**
* Drop {@link ProcessQueue} by {@link MessageQueueImpl}, {@link ProcessQueue} must be removed before it is dropped.
*
* @param mq message queue.
*/
void dropProcessQueue(MessageQueueImpl mq) {
final ProcessQueue pq = processQueueTable.remove(mq);
if (null != pq) {
pq.drop();
}
}
/**
* Create process queue and add it into {@link #processQueueTable}, return {@link Optional#empty()} if mapped
* process queue already exists.
* <p>
* This function and {@link #dropProcessQueue(MessageQueueImpl)} make sures that process queue is not dropped if
* it is contained in {@link #processQueueTable}, once process queue is dropped, it must have been removed
* from {@link #processQueueTable}.
*
* @param mq message queue.
* @param filterExpression filter expression of topic.
* @return optional process queue.
*/
protected Optional<ProcessQueue> createProcessQueue(MessageQueueImpl mq, final FilterExpression filterExpression) {
final ProcessQueueImpl processQueue = new ProcessQueueImpl(this, mq, filterExpression);
final ProcessQueue previous = processQueueTable.putIfAbsent(mq, processQueue);
if (null != previous) {
return Optional.empty();
}
return Optional.of(processQueue);
}
@Override
public HeartbeatRequest wrapHeartbeatRequest() {
return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
.setClientType(ClientType.PUSH_CONSUMER).build();
}
@VisibleForTesting
void syncProcessQueue(String topic, Assignments assignments, FilterExpression filterExpression) {
Set<MessageQueueImpl> latest = new HashSet<>();
final List<Assignment> assignmentList = assignments.getAssignmentList();
for (Assignment assignment : assignmentList) {
latest.add(assignment.getMessageQueue());
}
Set<MessageQueueImpl> activeMqs = new HashSet<>();
for (Map.Entry<MessageQueueImpl, ProcessQueue> entry : processQueueTable.entrySet()) {
final MessageQueueImpl mq = entry.getKey();
final ProcessQueue pq = entry.getValue();
if (!topic.equals(mq.getTopic())) {
continue;
}
if (!latest.contains(mq)) {
log.info("Drop message queue according to the latest assignmentList, mq={}, clientId={}", mq,
clientId);
dropProcessQueue(mq);
continue;
}
if (pq.expired()) {
log.warn("Drop message queue because it is expired, mq={}, clientId={}", mq, clientId);
dropProcessQueue(mq);
continue;
}
activeMqs.add(mq);
}
for (MessageQueueImpl mq : latest) {
if (activeMqs.contains(mq)) {
continue;
}
final Optional<ProcessQueue> optionalProcessQueue = createProcessQueue(mq, filterExpression);
if (optionalProcessQueue.isPresent()) {
log.info("Start to fetch message from remote, mq={}, clientId={}", mq, clientId);
optionalProcessQueue.get().fetchMessageImmediately();
}
}
}
@VisibleForTesting
void scanAssignments() {
try {
log.debug("Start to scan assignments periodically, clientId={}", clientId);
for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
final String topic = entry.getKey();
final FilterExpression filterExpression = entry.getValue();
final Assignments existed = cacheAssignments.get(topic);
final ListenableFuture<Assignments> future = queryAssignment(topic);
Futures.addCallback(future, new FutureCallback<Assignments>() {
@Override
public void onSuccess(Assignments latest) {
if (latest.getAssignmentList().isEmpty()) {
if (null == existed || existed.getAssignmentList().isEmpty()) {
log.info("Acquired empty assignments from remote, would scan later, topic={}, "
+ "clientId={}", topic, clientId);
return;
}
log.info("Attention!!! acquired empty assignments from remote, but existed assignments"
+ " is not empty, topic={}, clientId={}", topic, clientId);
}
if (!latest.equals(existed)) {
log.info("Assignments of topic={} has changed, {} => {}, clientId={}", topic, existed,
latest, clientId);
syncProcessQueue(topic, latest, filterExpression);
cacheAssignments.put(topic, latest);
return;
}
log.debug("Assignments of topic={} remains the same, assignments={}, clientId={}", topic,
existed, clientId);
// Process queue may be dropped, need to be synchronized anyway.
syncProcessQueue(topic, latest, filterExpression);
}
@Override
public void onFailure(Throwable t) {
log.error("Exception raised while scanning the assignments, topic={}, clientId={}", topic,
clientId, t);
}
}, MoreExecutors.directExecutor());
}
} catch (Throwable t) {
log.error("Exception raised while scanning the assignments for all topics, clientId={}", clientId, t);
}
}
@Override
public Settings getSettings() {
return pushSubscriptionSettings;
}
/**
* @see PushConsumer#close()
*/
@Override
public void close() {
this.stopAsync().awaitTerminated();
}
int getQueueSize() {
return processQueueTable.size();
}
int cacheMessageBytesThresholdPerQueue() {
final int size = this.getQueueSize();
// ALl process queues are removed, no need to cache messages.
if (size <= 0) {
return 0;
}
return Math.max(1, maxCacheMessageSizeInBytes / size);
}
int cacheMessageCountThresholdPerQueue() {
final int size = this.getQueueSize();
// All process queues are removed, no need to cache messages.
if (size <= 0) {
return 0;
}
return Math.max(1, maxCacheMessageCount / size);
}
public AtomicLong getReceptionTimes() {
return receptionTimes;
}
public AtomicLong getReceivedMessagesQuantity() {
return receivedMessagesQuantity;
}
public ConsumeService getConsumeService() {
return consumeService;
}
@Override
public void onVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand verifyMessageCommand) {
final String nonce = verifyMessageCommand.getNonce();
final MessageViewImpl messageView = MessageViewImpl.fromProtobuf(verifyMessageCommand.getMessage());
final MessageId messageId = messageView.getMessageId();
final ListenableFuture<ConsumeResult> future = consumeService.consume(messageView);
Futures.addCallback(future, new FutureCallback<ConsumeResult>() {
@Override
public void onSuccess(ConsumeResult consumeResult) {
Code code = ConsumeResult.SUCCESS.equals(consumeResult) ? Code.OK : Code.FAILED_TO_CONSUME_MESSAGE;
Status status = Status.newBuilder().setCode(code).build();
final VerifyMessageResult verifyMessageResult =
VerifyMessageResult.newBuilder().setNonce(nonce).build();
TelemetryCommand command = TelemetryCommand.newBuilder()
.setVerifyMessageResult(verifyMessageResult)
.setStatus(status)
.build();
try {
telemetry(endpoints, command);
} catch (Throwable t) {
log.error("Failed to send message verification result command, endpoints={}, command={}, "
+ "messageId={}, clientId={}", endpoints, command, messageId, clientId, t);
}
}
@Override
public void onFailure(Throwable t) {
// Should never reach here.
log.error("[Bug] Failed to get message verification result, endpoints={}, messageId={}, "
+ "clientId={}", endpoints, messageId, clientId, t);
}
}, MoreExecutors.directExecutor());
}
private ForwardMessageToDeadLetterQueueRequest wrapForwardMessageToDeadLetterQueueRequest(
MessageViewImpl messageView) {
final apache.rocketmq.v2.Resource topicResource =
apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(messageView.getTopic())
.build();
return ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
.setReceiptHandle(messageView.getReceiptHandle())
.setMessageId(messageView.getMessageId().toString())
.setDeliveryAttempt(messageView.getDeliveryAttempt())
.setMaxDeliveryAttempts(getRetryPolicy().getMaxAttempts()).build();
}
public RpcFuture<ForwardMessageToDeadLetterQueueRequest, ForwardMessageToDeadLetterQueueResponse>
forwardMessageToDeadLetterQueue(final MessageViewImpl messageView) {
// Intercept before forwarding message to DLQ.
final List<GeneralMessage> generalMessages = Collections.singletonList(new GeneralMessageImpl(messageView));
MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.FORWARD_TO_DLQ);
doBefore(context, generalMessages);
final Endpoints endpoints = messageView.getEndpoints();
RpcFuture<ForwardMessageToDeadLetterQueueRequest, ForwardMessageToDeadLetterQueueResponse> future;
final ForwardMessageToDeadLetterQueueRequest request =
wrapForwardMessageToDeadLetterQueueRequest(messageView);
future = this.getClientManager().forwardMessageToDeadLetterQueue(endpoints, request,
clientConfiguration.getRequestTimeout());
Futures.addCallback(future, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
@Override
public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) {
// Intercept after forwarding message to DLQ.
MessageHookPointsStatus status = Code.OK.equals(response.getStatus().getCode()) ?
MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
final MessageInterceptorContext context0 = new MessageInterceptorContextImpl(context, status);
doAfter(context0, generalMessages);
}
@Override
public void onFailure(Throwable t) {
// Intercept after forwarding message to DLQ.
final MessageInterceptorContext context0 = new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
}
}, MoreExecutors.directExecutor());
return future;
}
@ExcludeFromJacocoGeneratedReport
@Override
public void doStats() {
final long receptionTimes = this.receptionTimes.getAndSet(0);
final long receivedMessagesQuantity = this.receivedMessagesQuantity.getAndSet(0);
final long consumptionOkQuantity = this.consumptionOkQuantity.getAndSet(0);
final long consumptionErrorQuantity = this.consumptionErrorQuantity.getAndSet(0);
log.info("clientId={}, consumerGroup={}, receptionTimes={}, receivedMessagesQuantity={}, "
+ "consumptionOkQuantity={}, consumptionErrorQuantity={}", clientId, consumerGroup, receptionTimes,
receivedMessagesQuantity, consumptionOkQuantity, consumptionErrorQuantity);
processQueueTable.values().forEach(ProcessQueue::doStats);
}
public RetryPolicy getRetryPolicy() {
return pushSubscriptionSettings.getRetryPolicy();
}
public ThreadPoolExecutor getConsumptionExecutor() {
return consumptionExecutor;
}
}