| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.rocketmq.broker.processor; |
| |
| import com.alibaba.fastjson.JSON; |
| import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; |
| import io.netty.channel.Channel; |
| import io.netty.channel.ChannelFutureListener; |
| import io.netty.channel.ChannelHandlerContext; |
| import io.netty.channel.FileRegion; |
| import io.opentelemetry.api.common.Attributes; |
| import java.nio.ByteBuffer; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.Random; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentSkipListSet; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import org.apache.rocketmq.broker.BrokerController; |
| import org.apache.rocketmq.broker.filter.ConsumerFilterData; |
| import org.apache.rocketmq.broker.filter.ConsumerFilterManager; |
| import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; |
| import org.apache.rocketmq.broker.longpolling.PollingHeader; |
| import org.apache.rocketmq.broker.longpolling.PollingResult; |
| import org.apache.rocketmq.broker.longpolling.PopLongPollingService; |
| import org.apache.rocketmq.broker.longpolling.PopRequest; |
| import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; |
| import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; |
| import org.apache.rocketmq.common.BrokerConfig; |
| import org.apache.rocketmq.common.KeyBuilder; |
| import org.apache.rocketmq.common.MixAll; |
| import org.apache.rocketmq.common.PopAckConstants; |
| import org.apache.rocketmq.common.ServiceThread; |
| import org.apache.rocketmq.common.TopicConfig; |
| import org.apache.rocketmq.common.constant.ConsumeInitMode; |
| import org.apache.rocketmq.common.constant.LoggerName; |
| import org.apache.rocketmq.common.constant.PermName; |
| import org.apache.rocketmq.common.filter.ExpressionType; |
| import org.apache.rocketmq.common.help.FAQUrl; |
| import org.apache.rocketmq.common.message.MessageConst; |
| import org.apache.rocketmq.common.message.MessageDecoder; |
| import org.apache.rocketmq.common.message.MessageExt; |
| import org.apache.rocketmq.common.message.MessageExtBrokerInner; |
| import org.apache.rocketmq.common.topic.TopicValidator; |
| import org.apache.rocketmq.common.utils.DataConverter; |
| import org.apache.rocketmq.logging.org.slf4j.Logger; |
| import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; |
| import org.apache.rocketmq.remoting.common.RemotingHelper; |
| import org.apache.rocketmq.remoting.exception.RemotingCommandException; |
| import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager; |
| import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract; |
| import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; |
| import org.apache.rocketmq.remoting.protocol.RemotingCommand; |
| import org.apache.rocketmq.remoting.protocol.ResponseCode; |
| import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; |
| import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; |
| import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; |
| import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader; |
| import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; |
| import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; |
| import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; |
| import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; |
| import org.apache.rocketmq.store.GetMessageResult; |
| import org.apache.rocketmq.store.GetMessageStatus; |
| import org.apache.rocketmq.store.SelectMappedBufferResult; |
| import org.apache.rocketmq.store.pop.AckMsg; |
| import org.apache.rocketmq.store.pop.BatchAckMsg; |
| import org.apache.rocketmq.store.pop.PopCheckPoint; |
| |
| import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP; |
| import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY; |
| import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; |
| import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC; |
| import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE; |
| import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE; |
| import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT; |
| |
| public class PopMessageProcessor implements NettyRequestProcessor { |
| private static final Logger POP_LOGGER = |
| LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); |
| private final BrokerController brokerController; |
| private final Random random = new Random(System.currentTimeMillis()); |
| String reviveTopic; |
| private static final String BORN_TIME = "bornTime"; |
| |
| private final PopLongPollingService popLongPollingService; |
| private final PopBufferMergeService popBufferMergeService; |
| private final QueueLockManager queueLockManager; |
| private final AtomicLong ckMessageNumber; |
| |
| public PopMessageProcessor(final BrokerController brokerController) { |
| this.brokerController = brokerController; |
| this.reviveTopic = PopAckConstants.buildClusterReviveTopic(this.brokerController.getBrokerConfig().getBrokerClusterName()); |
| this.popLongPollingService = new PopLongPollingService(brokerController, this, false); |
| this.queueLockManager = new QueueLockManager(); |
| this.popBufferMergeService = new PopBufferMergeService(this.brokerController, this); |
| this.ckMessageNumber = new AtomicLong(); |
| } |
| |
| public PopLongPollingService getPopLongPollingService() { |
| return popLongPollingService; |
| } |
| |
| public PopBufferMergeService getPopBufferMergeService() { |
| return this.popBufferMergeService; |
| } |
| |
| public QueueLockManager getQueueLockManager() { |
| return queueLockManager; |
| } |
| |
| public static String genAckUniqueId(AckMsg ackMsg) { |
| return ackMsg.getTopic() |
| + PopAckConstants.SPLIT + ackMsg.getQueueId() |
| + PopAckConstants.SPLIT + ackMsg.getAckOffset() |
| + PopAckConstants.SPLIT + ackMsg.getConsumerGroup() |
| + PopAckConstants.SPLIT + ackMsg.getPopTime() |
| + PopAckConstants.SPLIT + ackMsg.getBrokerName() |
| + PopAckConstants.SPLIT + PopAckConstants.ACK_TAG; |
| } |
| |
| public static String genBatchAckUniqueId(BatchAckMsg batchAckMsg) { |
| return batchAckMsg.getTopic() |
| + PopAckConstants.SPLIT + batchAckMsg.getQueueId() |
| + PopAckConstants.SPLIT + batchAckMsg.getAckOffsetList().toString() |
| + PopAckConstants.SPLIT + batchAckMsg.getConsumerGroup() |
| + PopAckConstants.SPLIT + batchAckMsg.getPopTime() |
| + PopAckConstants.SPLIT + PopAckConstants.BATCH_ACK_TAG; |
| } |
| |
| public static String genCkUniqueId(PopCheckPoint ck) { |
| return ck.getTopic() |
| + PopAckConstants.SPLIT + ck.getQueueId() |
| + PopAckConstants.SPLIT + ck.getStartOffset() |
| + PopAckConstants.SPLIT + ck.getCId() |
| + PopAckConstants.SPLIT + ck.getPopTime() |
| + PopAckConstants.SPLIT + ck.getBrokerName() |
| + PopAckConstants.SPLIT + PopAckConstants.CK_TAG; |
| } |
| |
| @Override |
| public boolean rejectRequest() { |
| return false; |
| } |
| |
| public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() { |
| return popLongPollingService.getPollingMap(); |
| } |
| |
| public void notifyLongPollingRequestIfNeed(String topic, String group, int queueId) { |
| this.notifyLongPollingRequestIfNeed( |
| topic, group, queueId, null, 0L, null, null); |
| } |
| |
| public void notifyLongPollingRequestIfNeed(String topic, String group, int queueId, |
| Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { |
| long popBufferOffset = this.brokerController.getPopMessageProcessor().getPopBufferMergeService().getLatestOffset(topic, group, queueId); |
| long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, queueId); |
| long maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); |
| long offset = Math.max(popBufferOffset, consumerOffset); |
| if (maxOffset > offset) { |
| boolean notifySuccess = popLongPollingService.notifyMessageArriving( |
| topic, -1, group, tagsCode, msgStoreTime, filterBitMap, properties); |
| if (!notifySuccess) { |
| // notify pop queue |
| notifySuccess = popLongPollingService.notifyMessageArriving( |
| topic, queueId, group, tagsCode, msgStoreTime, filterBitMap, properties); |
| } |
| this.brokerController.getNotificationProcessor().notifyMessageArriving(topic, queueId); |
| if (this.brokerController.getBrokerConfig().isEnablePopLog()) { |
| POP_LOGGER.info("notify long polling request. topic:{}, group:{}, queueId:{}, success:{}", |
| topic, group, queueId, notifySuccess); |
| } |
| } |
| } |
| |
| public void notifyMessageArriving(final String topic, final int queueId, |
| Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { |
| popLongPollingService.notifyMessageArrivingWithRetryTopic( |
| topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); |
| } |
| |
| public void notifyMessageArriving(final String topic, final int queueId, final String cid) { |
| popLongPollingService.notifyMessageArriving( |
| topic, queueId, cid, null, 0L, null, null); |
| } |
| |
| @Override |
| public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) |
| throws RemotingCommandException { |
| final long beginTimeMills = this.brokerController.getMessageStore().now(); |
| request.addExtFieldIfNotExist(BORN_TIME, String.valueOf(System.currentTimeMillis())); |
| if (Objects.equals(request.getExtFields().get(BORN_TIME), "0")) { |
| request.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis())); |
| } |
| Channel channel = ctx.channel(); |
| |
| RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class); |
| final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader(); |
| final PopMessageRequestHeader requestHeader = |
| (PopMessageRequestHeader) request.decodeCommandCustomHeader(PopMessageRequestHeader.class, true); |
| StringBuilder startOffsetInfo = new StringBuilder(64); |
| StringBuilder msgOffsetInfo = new StringBuilder(64); |
| StringBuilder orderCountInfo = null; |
| if (requestHeader.isOrder()) { |
| orderCountInfo = new StringBuilder(64); |
| } |
| |
| brokerController.getConsumerManager().compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), |
| ConsumeType.CONSUME_POP, MessageModel.CLUSTERING); |
| |
| response.setOpaque(request.getOpaque()); |
| |
| if (brokerController.getBrokerConfig().isEnablePopLog()) { |
| POP_LOGGER.info("receive PopMessage request command, {}", request); |
| } |
| |
| if (requestHeader.isTimeoutTooMuch()) { |
| response.setCode(ResponseCode.POLLING_TIMEOUT); |
| response.setRemark(String.format("the broker[%s] pop message is timeout too much", |
| this.brokerController.getBrokerConfig().getBrokerIP1())); |
| return response; |
| } |
| if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { |
| response.setCode(ResponseCode.NO_PERMISSION); |
| response.setRemark(String.format("the broker[%s] pop message is forbidden", |
| this.brokerController.getBrokerConfig().getBrokerIP1())); |
| return response; |
| } |
| if (requestHeader.getMaxMsgNums() > 32) { |
| response.setCode(ResponseCode.SYSTEM_ERROR); |
| response.setRemark(String.format("the broker[%s] pop message's num is greater than 32", |
| this.brokerController.getBrokerConfig().getBrokerIP1())); |
| return response; |
| } |
| |
| if (!brokerController.getMessageStore().getMessageStoreConfig().isTimerWheelEnable()) { |
| response.setCode(ResponseCode.SYSTEM_ERROR); |
| response.setRemark(String.format("the broker[%s] pop message is forbidden because timerWheelEnable is false", |
| this.brokerController.getBrokerConfig().getBrokerIP1())); |
| return response; |
| } |
| |
| TopicConfig topicConfig = |
| this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); |
| if (null == topicConfig) { |
| POP_LOGGER.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), |
| RemotingHelper.parseChannelRemoteAddr(channel)); |
| response.setCode(ResponseCode.TOPIC_NOT_EXIST); |
| response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), |
| FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL))); |
| return response; |
| } |
| |
| if (!PermName.isReadable(topicConfig.getPerm())) { |
| response.setCode(ResponseCode.NO_PERMISSION); |
| response.setRemark("the topic[" + requestHeader.getTopic() + "] peeking message is forbidden"); |
| return response; |
| } |
| |
| if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { |
| String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] " + |
| "consumer:[%s]", |
| requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), |
| channel.remoteAddress()); |
| POP_LOGGER.warn(errorInfo); |
| response.setCode(ResponseCode.SYSTEM_ERROR); |
| response.setRemark(errorInfo); |
| return response; |
| } |
| SubscriptionGroupConfig subscriptionGroupConfig = |
| this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); |
| if (null == subscriptionGroupConfig) { |
| response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); |
| response.setRemark(String.format("subscription group [%s] does not exist, %s", |
| requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST))); |
| return response; |
| } |
| |
| if (!subscriptionGroupConfig.isConsumeEnable()) { |
| response.setCode(ResponseCode.NO_PERMISSION); |
| response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup()); |
| return response; |
| } |
| |
| BrokerConfig brokerConfig = brokerController.getBrokerConfig(); |
| SubscriptionData subscriptionData = null; |
| ExpressionMessageFilter messageFilter = null; |
| if (requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) { |
| try { |
| subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType()); |
| brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), |
| requestHeader.getTopic(), subscriptionData); |
| |
| String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); |
| SubscriptionData retrySubscriptionData = FilterAPI.build(retryTopic, SubscriptionData.SUB_ALL, requestHeader.getExpType()); |
| brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), |
| retryTopic, retrySubscriptionData); |
| |
| ConsumerFilterData consumerFilterData = null; |
| if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { |
| consumerFilterData = ConsumerFilterManager.build( |
| requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(), |
| requestHeader.getExpType(), System.currentTimeMillis() |
| ); |
| if (consumerFilterData == null) { |
| POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", |
| requestHeader.getExp(), requestHeader.getConsumerGroup()); |
| response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); |
| response.setRemark("parse the consumer's subscription failed"); |
| return response; |
| } |
| } |
| messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, |
| brokerController.getConsumerFilterManager()); |
| } catch (Exception e) { |
| POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(), |
| requestHeader.getConsumerGroup()); |
| response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); |
| response.setRemark("parse the consumer's subscription failed"); |
| return response; |
| } |
| } else { |
| try { |
| subscriptionData = FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG); |
| brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), |
| requestHeader.getTopic(), subscriptionData); |
| |
| String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); |
| SubscriptionData retrySubscriptionData = FilterAPI.build(retryTopic, "*", ExpressionType.TAG); |
| brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), |
| retryTopic, retrySubscriptionData); |
| } catch (Exception e) { |
| POP_LOGGER.warn("Build default subscription error, group: {}", requestHeader.getConsumerGroup()); |
| } |
| } |
| |
| int randomQ = random.nextInt(100); |
| int reviveQid; |
| if (requestHeader.isOrder()) { |
| reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE; |
| } else { |
| reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum()); |
| } |
| |
| GetMessageResult getMessageResult = new GetMessageResult(requestHeader.getMaxMsgNums()); |
| ExpressionMessageFilter finalMessageFilter = messageFilter; |
| StringBuilder finalOrderCountInfo = orderCountInfo; |
| |
| // Due to the design of the fields startOffsetInfo, msgOffsetInfo, and orderCountInfo, |
| // a single POP request could only invoke the popMsgFromQueue method once |
| // for either a normal topic or a retry topic's queue. Retry topics v1 and v2 are |
| // considered the same type because they share the same retry flag in previous fields. |
| // Therefore, needRetryV1 is designed as a subset of needRetry, and within a single request, |
| // only one type of retry topic is able to call popMsgFromQueue. |
| boolean needRetry = randomQ % 5 == 0; |
| boolean needRetryV1 = false; |
| if (brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { |
| needRetryV1 = randomQ % 2 == 0; |
| } |
| long popTime = System.currentTimeMillis(); |
| CompletableFuture<Long> getMessageFuture = CompletableFuture.completedFuture(0L); |
| if (needRetry && !requestHeader.isOrder()) { |
| if (needRetryV1) { |
| String retryTopic = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()); |
| getMessageFuture = popMsgFromTopic(retryTopic, true, getMessageResult, requestHeader, reviveQid, channel, |
| popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); |
| } else { |
| String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); |
| getMessageFuture = popMsgFromTopic(retryTopic, true, getMessageResult, requestHeader, reviveQid, channel, |
| popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); |
| } |
| } |
| if (requestHeader.getQueueId() < 0) { |
| // read all queue |
| getMessageFuture = popMsgFromTopic(topicConfig, false, getMessageResult, requestHeader, reviveQid, channel, |
| popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); |
| } else { |
| int queueId = requestHeader.getQueueId(); |
| getMessageFuture = getMessageFuture.thenCompose(restNum -> |
| popMsgFromQueue(topicConfig.getTopicName(), requestHeader.getAttemptId(), false, |
| getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, |
| startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); |
| } |
| // if not full , fetch retry again |
| if (!needRetry && getMessageResult.getMessageMapedList().size() < requestHeader.getMaxMsgNums() && !requestHeader.isOrder()) { |
| if (needRetryV1) { |
| String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()); |
| getMessageFuture = popMsgFromTopic(retryTopicV1, true, getMessageResult, requestHeader, reviveQid, channel, |
| popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); |
| } else { |
| String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); |
| getMessageFuture = popMsgFromTopic(retryTopic, true, getMessageResult, requestHeader, reviveQid, channel, |
| popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); |
| } |
| } |
| |
| final RemotingCommand finalResponse = response; |
| SubscriptionData finalSubscriptionData = subscriptionData; |
| getMessageFuture.thenApply(restNum -> { |
| if (!getMessageResult.getMessageBufferList().isEmpty()) { |
| finalResponse.setCode(ResponseCode.SUCCESS); |
| getMessageResult.setStatus(GetMessageStatus.FOUND); |
| if (restNum > 0) { |
| // all queue pop can not notify specified queue pop, and vice versa |
| popLongPollingService.notifyMessageArriving( |
| requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup(), |
| null, 0L, null, null); |
| } |
| } else { |
| PollingResult pollingResult = popLongPollingService.polling( |
| ctx, request, new PollingHeader(requestHeader), finalSubscriptionData, finalMessageFilter); |
| if (PollingResult.POLLING_SUC == pollingResult) { |
| return null; |
| } else if (PollingResult.POLLING_FULL == pollingResult) { |
| finalResponse.setCode(ResponseCode.POLLING_FULL); |
| } else { |
| finalResponse.setCode(ResponseCode.POLLING_TIMEOUT); |
| } |
| getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE); |
| } |
| responseHeader.setInvisibleTime(requestHeader.getInvisibleTime()); |
| responseHeader.setPopTime(popTime); |
| responseHeader.setReviveQid(reviveQid); |
| responseHeader.setRestNum(restNum); |
| responseHeader.setStartOffsetInfo(startOffsetInfo.toString()); |
| responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString()); |
| if (requestHeader.isOrder() && finalOrderCountInfo != null) { |
| responseHeader.setOrderCountInfo(finalOrderCountInfo.toString()); |
| } |
| finalResponse.setRemark(getMessageResult.getStatus().name()); |
| switch (finalResponse.getCode()) { |
| case ResponseCode.SUCCESS: |
| if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { |
| final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), |
| requestHeader.getTopic(), requestHeader.getQueueId()); |
| this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), |
| requestHeader.getTopic(), requestHeader.getQueueId(), |
| (int) (this.brokerController.getMessageStore().now() - beginTimeMills)); |
| finalResponse.setBody(r); |
| } else { |
| final GetMessageResult tmpGetMessageResult = getMessageResult; |
| try { |
| FileRegion fileRegion = |
| new ManyMessageTransfer(finalResponse.encodeHeader(getMessageResult.getBufferTotalSize()), |
| getMessageResult); |
| channel.writeAndFlush(fileRegion) |
| .addListener((ChannelFutureListener) future -> { |
| tmpGetMessageResult.release(); |
| Attributes attributes = RemotingMetricsManager.newAttributesBuilder() |
| .put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(request.getCode())) |
| .put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(finalResponse.getCode())) |
| .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future)) |
| .build(); |
| RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes); |
| if (!future.isSuccess()) { |
| POP_LOGGER.error("Fail to transfer messages from page cache to {}", |
| channel.remoteAddress(), future.cause()); |
| } |
| }); |
| } catch (Throwable e) { |
| POP_LOGGER.error("Error occurred when transferring messages from page cache", e); |
| getMessageResult.release(); |
| } |
| |
| return null; |
| } |
| break; |
| default: |
| return finalResponse; |
| } |
| return finalResponse; |
| }).thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result)); |
| return null; |
| } |
| |
| private CompletableFuture<Long> popMsgFromTopic(TopicConfig topicConfig, boolean isRetry, GetMessageResult getMessageResult, |
| PopMessageRequestHeader requestHeader, int reviveQid, Channel channel, long popTime, |
| ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, |
| StringBuilder msgOffsetInfo, StringBuilder orderCountInfo, int randomQ, CompletableFuture<Long> getMessageFuture) { |
| if (topicConfig != null) { |
| for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { |
| int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); |
| getMessageFuture = getMessageFuture.thenCompose(restNum -> |
| popMsgFromQueue(topicConfig.getTopicName(), requestHeader.getAttemptId(), isRetry, |
| getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter, |
| startOffsetInfo, msgOffsetInfo, orderCountInfo)); |
| } |
| } |
| return getMessageFuture; |
| } |
| |
| private CompletableFuture<Long> popMsgFromTopic(String topic, boolean isRetry, GetMessageResult getMessageResult, |
| PopMessageRequestHeader requestHeader, int reviveQid, Channel channel, long popTime, |
| ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, |
| StringBuilder msgOffsetInfo, StringBuilder orderCountInfo, int randomQ, CompletableFuture<Long> getMessageFuture) { |
| TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); |
| return popMsgFromTopic(topicConfig, isRetry, getMessageResult, requestHeader, reviveQid, channel, popTime, |
| messageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); |
| } |
| |
| private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId, boolean isRetry, GetMessageResult getMessageResult, |
| PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid, |
| Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, |
| StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) { |
| String lockKey = |
| topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId; |
| boolean isOrder = requestHeader.isOrder(); |
| long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(), |
| false, lockKey, false); |
| CompletableFuture<Long> future = new CompletableFuture<>(); |
| if (!queueLockManager.tryLock(lockKey)) { |
| restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; |
| future.complete(restNum); |
| return future; |
| } |
| |
| if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) { |
| POP_LOGGER.warn("Too much msgs unacked, then stop poping. topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(), queueId); |
| restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; |
| future.complete(restNum); |
| return future; |
| } |
| |
| try { |
| future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); |
| offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(), |
| true, lockKey, true); |
| if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic, |
| requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) { |
| future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum); |
| return future; |
| } |
| |
| if (isOrder) { |
| this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNum( |
| topic, |
| requestHeader.getConsumerGroup(), |
| queueId |
| ); |
| } |
| |
| if (getMessageResult.getMessageMapedList().size() >= requestHeader.getMaxMsgNums()) { |
| restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; |
| future.complete(restNum); |
| return future; |
| } |
| } catch (Exception e) { |
| POP_LOGGER.error("Exception in popMsgFromQueue", e); |
| future.complete(restNum); |
| return future; |
| } |
| |
| AtomicLong atomicRestNum = new AtomicLong(restNum); |
| AtomicLong atomicOffset = new AtomicLong(offset); |
| long finalOffset = offset; |
| return this.brokerController.getMessageStore() |
| .getMessageAsync(requestHeader.getConsumerGroup(), topic, queueId, offset, |
| requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter) |
| .thenCompose(result -> { |
| if (result == null) { |
| return CompletableFuture.completedFuture(null); |
| } |
| // maybe store offset is not correct. |
| if (GetMessageStatus.OFFSET_TOO_SMALL.equals(result.getStatus()) |
| || GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(result.getStatus()) |
| || GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus())) { |
| // commit offset, because the offset is not correct |
| // If offset in store is greater than cq offset, it will cause duplicate messages, |
| // because offset in PopBuffer is not committed. |
| POP_LOGGER.warn("Pop initial offset, because store is no correct, {}, {}->{}", |
| lockKey, atomicOffset.get(), result.getNextBeginOffset()); |
| this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic, |
| queueId, result.getNextBeginOffset()); |
| atomicOffset.set(result.getNextBeginOffset()); |
| return this.brokerController.getMessageStore().getMessageAsync(requestHeader.getConsumerGroup(), topic, queueId, atomicOffset.get(), |
| requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter); |
| } |
| return CompletableFuture.completedFuture(result); |
| }).thenApply(result -> { |
| if (result == null) { |
| atomicRestNum.set(brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - atomicOffset.get() + atomicRestNum.get()); |
| return atomicRestNum.get(); |
| } |
| if (!result.getMessageMapedList().isEmpty()) { |
| this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), result.getMessageCount()); |
| this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), topic, |
| result.getMessageCount()); |
| this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), topic, |
| result.getBufferTotalSize()); |
| |
| Attributes attributes = BrokerMetricsManager.newAttributesBuilder() |
| .put(LABEL_TOPIC, requestHeader.getTopic()) |
| .put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup()) |
| .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup())) |
| .put(LABEL_IS_RETRY, isRetry) |
| .build(); |
| BrokerMetricsManager.messagesOutTotal.add(result.getMessageCount(), attributes); |
| BrokerMetricsManager.throughputOutTotal.add(result.getBufferTotalSize(), attributes); |
| |
| if (isOrder) { |
| this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(), isRetry, topic, |
| requestHeader.getConsumerGroup(), |
| queueId, popTime, requestHeader.getInvisibleTime(), result.getMessageQueueOffset(), |
| orderCountInfo); |
| this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), |
| requestHeader.getConsumerGroup(), topic, queueId, finalOffset); |
| } else { |
| if (!appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName())) { |
| return atomicRestNum.get() + result.getMessageCount(); |
| } |
| } |
| ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, queueId, finalOffset); |
| ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, queueId, |
| result.getMessageQueueOffset()); |
| } else if ((GetMessageStatus.NO_MATCHED_MESSAGE.equals(result.getStatus()) |
| || GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus()) |
| || GetMessageStatus.MESSAGE_WAS_REMOVING.equals(result.getStatus()) |
| || GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.equals(result.getStatus())) |
| && result.getNextBeginOffset() > -1) { |
| if (isOrder) { |
| this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic, |
| queueId, result.getNextBeginOffset()); |
| } else { |
| popBufferMergeService.addCkMock(requestHeader.getConsumerGroup(), topic, queueId, finalOffset, |
| requestHeader.getInvisibleTime(), popTime, reviveQid, result.getNextBeginOffset(), brokerController.getBrokerConfig().getBrokerName()); |
| } |
| } |
| |
| atomicRestNum.set(result.getMaxOffset() - result.getNextBeginOffset() + atomicRestNum.get()); |
| String brokerName = brokerController.getBrokerConfig().getBrokerName(); |
| for (SelectMappedBufferResult mapedBuffer : result.getMessageMapedList()) { |
| // We should not recode buffer when popResponseReturnActualRetryTopic is true or topic is not retry topic |
| if (brokerController.getBrokerConfig().isPopResponseReturnActualRetryTopic() || !isRetry) { |
| getMessageResult.addMessage(mapedBuffer); |
| } else { |
| List<MessageExt> messageExtList = MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(), |
| true, false, true); |
| mapedBuffer.release(); |
| for (MessageExt messageExt : messageExtList) { |
| try { |
| String ckInfo = ExtraInfoUtil.buildExtraInfo(finalOffset, popTime, requestHeader.getInvisibleTime(), |
| reviveQid, messageExt.getTopic(), brokerName, messageExt.getQueueId(), messageExt.getQueueOffset()); |
| messageExt.getProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, ckInfo); |
| |
| // Set retry message topic to origin topic and clear message store size to recode |
| messageExt.setTopic(requestHeader.getTopic()); |
| messageExt.setStoreSize(0); |
| |
| byte[] encode = MessageDecoder.encode(messageExt, false); |
| ByteBuffer buffer = ByteBuffer.wrap(encode); |
| SelectMappedBufferResult tmpResult = |
| new SelectMappedBufferResult(mapedBuffer.getStartOffset(), buffer, encode.length, null); |
| getMessageResult.addMessage(tmpResult); |
| } catch (Exception e) { |
| POP_LOGGER.error("Exception in recode retry message buffer, topic={}", topic, e); |
| } |
| } |
| } |
| } |
| this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum( |
| topic, |
| requestHeader.getConsumerGroup(), |
| queueId, |
| result.getMessageCount() |
| ); |
| return atomicRestNum.get(); |
| }).whenComplete((result, throwable) -> { |
| if (throwable != null) { |
| POP_LOGGER.error("Pop message error, {}", lockKey, throwable); |
| } |
| queueLockManager.unLock(lockKey); |
| }); |
| } |
| |
| private boolean isPopShouldStop(String topic, String group, int queueId) { |
| return brokerController.getBrokerConfig().isEnablePopMessageThreshold() && |
| brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic, group, queueId) > brokerController.getBrokerConfig().getPopInflightMessageThreshold(); |
| } |
| |
| private long getPopOffset(String topic, String group, int queueId, int initMode, boolean init, String lockKey, |
| boolean checkResetOffset) { |
| |
| long offset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, queueId); |
| if (offset < 0) { |
| offset = this.getInitOffset(topic, group, queueId, initMode, init); |
| } |
| |
| if (checkResetOffset) { |
| Long resetOffset = resetPopOffset(topic, group, queueId); |
| if (resetOffset != null) { |
| return resetOffset; |
| } |
| } |
| |
| long bufferOffset = this.popBufferMergeService.getLatestOffset(lockKey); |
| if (bufferOffset < 0) { |
| return offset; |
| } else { |
| return Math.max(bufferOffset, offset); |
| } |
| } |
| |
| private long getInitOffset(String topic, String group, int queueId, int initMode, boolean init) { |
| long offset; |
| if (ConsumeInitMode.MIN == initMode) { |
| return this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId); |
| } else { |
| if (this.brokerController.getBrokerConfig().isInitPopOffsetByCheckMsgInMem() && |
| this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId) <= 0 && |
| this.brokerController.getMessageStore().checkInMemByConsumeOffset(topic, queueId, 0, 1)) { |
| offset = 0; |
| } else { |
| // pop last one,then commit offset. |
| offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1; |
| // max & no consumer offset |
| if (offset < 0) { |
| offset = 0; |
| } |
| } |
| if (init) { |
| this.brokerController.getConsumerOffsetManager().commitOffset( |
| "getPopOffset", group, topic, queueId, offset); |
| } |
| } |
| return offset; |
| } |
| |
| public final MessageExtBrokerInner buildCkMsg(final PopCheckPoint ck, final int reviveQid) { |
| MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); |
| |
| msgInner.setTopic(reviveTopic); |
| msgInner.setBody(JSON.toJSONString(ck).getBytes(DataConverter.CHARSET_UTF8)); |
| msgInner.setQueueId(reviveQid); |
| msgInner.setTags(PopAckConstants.CK_TAG); |
| msgInner.setBornTimestamp(System.currentTimeMillis()); |
| msgInner.setBornHost(this.brokerController.getStoreHost()); |
| msgInner.setStoreHost(this.brokerController.getStoreHost()); |
| msgInner.setDeliverTimeMs(ck.getReviveTime() - PopAckConstants.ackTimeInterval); |
| msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, genCkUniqueId(ck)); |
| msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); |
| |
| return msgInner; |
| } |
| |
| private boolean appendCheckPoint(final PopMessageRequestHeader requestHeader, |
| final String topic, final int reviveQid, final int queueId, final long offset, |
| final GetMessageResult getMessageTmpResult, final long popTime, final String brokerName) { |
| // add check point msg to revive log |
| final PopCheckPoint ck = new PopCheckPoint(); |
| ck.setBitMap(0); |
| ck.setNum((byte) getMessageTmpResult.getMessageMapedList().size()); |
| ck.setPopTime(popTime); |
| ck.setInvisibleTime(requestHeader.getInvisibleTime()); |
| ck.setStartOffset(offset); |
| ck.setCId(requestHeader.getConsumerGroup()); |
| ck.setTopic(topic); |
| ck.setQueueId(queueId); |
| ck.setBrokerName(brokerName); |
| for (Long msgQueueOffset : getMessageTmpResult.getMessageQueueOffset()) { |
| ck.addDiff((int) (msgQueueOffset - offset)); |
| } |
| |
| final boolean addBufferSuc = this.popBufferMergeService.addCk( |
| ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset() |
| ); |
| |
| if (addBufferSuc) { |
| return true; |
| } |
| return this.popBufferMergeService.addCkJustOffset( |
| ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset() |
| ); |
| } |
| |
| private Long resetPopOffset(String topic, String group, int queueId) { |
| String lockKey = topic + PopAckConstants.SPLIT + group + PopAckConstants.SPLIT + queueId; |
| Long resetOffset = |
| this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topic, group, queueId); |
| if (resetOffset != null) { |
| this.brokerController.getConsumerOrderInfoManager().clearBlock(topic, group, queueId); |
| this.getPopBufferMergeService().clearOffsetQueue(lockKey); |
| this.brokerController.getConsumerOffsetManager() |
| .commitOffset("ResetPopOffset", group, topic, queueId, resetOffset); |
| } |
| return resetOffset; |
| } |
| |
| private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic, |
| final int queueId) { |
| final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize()); |
| |
| long storeTimestamp = 0; |
| try { |
| List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList(); |
| for (ByteBuffer bb : messageBufferList) { |
| |
| byteBuffer.put(bb); |
| storeTimestamp = bb.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION); |
| } |
| } finally { |
| getMessageResult.release(); |
| } |
| |
| this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, |
| this.brokerController.getMessageStore().now() - storeTimestamp); |
| return byteBuffer.array(); |
| } |
| |
| static class TimedLock { |
| private final AtomicBoolean lock; |
| private volatile long lockTime; |
| |
| public TimedLock() { |
| this.lock = new AtomicBoolean(true); |
| this.lockTime = System.currentTimeMillis(); |
| } |
| |
| public boolean tryLock() { |
| boolean ret = lock.compareAndSet(true, false); |
| if (ret) { |
| this.lockTime = System.currentTimeMillis(); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| public void unLock() { |
| lock.set(true); |
| } |
| |
| public boolean isLock() { |
| return !lock.get(); |
| } |
| |
| public long getLockTime() { |
| return lockTime; |
| } |
| } |
| |
| public class QueueLockManager extends ServiceThread { |
| private final ConcurrentHashMap<String, TimedLock> expiredLocalCache = new ConcurrentHashMap<>(100000); |
| |
| public String buildLockKey(String topic, String consumerGroup, int queueId) { |
| return topic + PopAckConstants.SPLIT + consumerGroup + PopAckConstants.SPLIT + queueId; |
| } |
| |
| public boolean tryLock(String topic, String consumerGroup, int queueId) { |
| return tryLock(buildLockKey(topic, consumerGroup, queueId)); |
| } |
| |
| public boolean tryLock(String key) { |
| TimedLock timedLock = expiredLocalCache.get(key); |
| |
| if (timedLock == null) { |
| TimedLock old = expiredLocalCache.putIfAbsent(key, new TimedLock()); |
| if (old != null) { |
| return false; |
| } else { |
| timedLock = expiredLocalCache.get(key); |
| } |
| } |
| |
| if (timedLock == null) { |
| return false; |
| } |
| |
| return timedLock.tryLock(); |
| } |
| |
| /** |
| * is not thread safe, may cause duplicate lock |
| * |
| * @param usedExpireMillis the expired time in millisecond |
| * @return total numbers of TimedLock |
| */ |
| public int cleanUnusedLock(final long usedExpireMillis) { |
| Iterator<Entry<String, TimedLock>> iterator = expiredLocalCache.entrySet().iterator(); |
| |
| int total = 0; |
| while (iterator.hasNext()) { |
| Entry<String, TimedLock> entry = iterator.next(); |
| |
| if (System.currentTimeMillis() - entry.getValue().getLockTime() > usedExpireMillis) { |
| iterator.remove(); |
| POP_LOGGER.info("Remove unused queue lock: {}, {}, {}", entry.getKey(), |
| entry.getValue().getLockTime(), |
| entry.getValue().isLock()); |
| } |
| |
| total++; |
| } |
| |
| return total; |
| } |
| |
| public void unLock(String topic, String consumerGroup, int queueId) { |
| unLock(buildLockKey(topic, consumerGroup, queueId)); |
| } |
| |
| public void unLock(String key) { |
| TimedLock timedLock = expiredLocalCache.get(key); |
| if (timedLock != null) { |
| timedLock.unLock(); |
| } |
| } |
| |
| @Override |
| public String getServiceName() { |
| if (PopMessageProcessor.this.brokerController.getBrokerConfig().isInBrokerContainer()) { |
| return PopMessageProcessor.this.brokerController.getBrokerIdentity().getIdentifier() + QueueLockManager.class.getSimpleName(); |
| } |
| return QueueLockManager.class.getSimpleName(); |
| } |
| |
| @Override |
| public void run() { |
| while (!isStopped()) { |
| try { |
| this.waitForRunning(60000); |
| int count = cleanUnusedLock(60000); |
| POP_LOGGER.info("QueueLockSize={}", count); |
| } catch (Exception e) { |
| PopMessageProcessor.POP_LOGGER.error("QueueLockManager run error", e); |
| } |
| } |
| } |
| } |
| } |