blob: f3e3617d376ff34200e0704667bdd2e88969d2c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.runtimer.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* @Author changfeng
* @Date 2023/4/9 10:06 上午
*/
public class LocalMessageCache {
private static final Logger log = LoggerFactory.getLogger(LocalMessageCache.class);
private final BlockingQueue<ConsumeRequest> consumeRequestCache;
private final Map<String, ConsumeRequest> consumedRequest;
private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable;
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final ClientConfig clientConfig;
LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) {
consumeRequestCache = new LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity());
this.consumedRequest = new ConcurrentHashMap<>();
this.pullOffsetTable = new ConcurrentHashMap<>();
this.rocketmqPullConsumer = rocketmqPullConsumer;
this.clientConfig = clientConfig;
}
int nextPullBatchNums() {
return Math.min(clientConfig.getRmqPullMessageBatchNums(), consumeRequestCache.remainingCapacity());
}
long nextPullOffset(MessageQueue remoteQueue) {
final AtomicReference<RuntimeException> outerException = new AtomicReference<>();
final Long existsOffset = pullOffsetTable.computeIfAbsent(remoteQueue, messageQueue -> {
try {
// Got -1 when MQBrokerException occurred, aka broker returns non-0 code, e.g. ResponseCode.QUERY_NOT_FOUND
// or any runtime exception thrown from rpc hook.
// Got -2 when other exception occurred, include broker not exists, network or client exception
long offset = rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false);
if (offset == -2) {
outerException.set(new RuntimeException("fetchConsumeOffsetFromBroker exception, please check rocketmq client for more details"));
return null;
}
if (offset == -1 || offset == 0) {
// Follow the CONSUME_FROM_WHERE to compute next pull offset
// But note that if broker thrown any unexpected runtime exception may cause offset rollback.
// We don't handle this risk because of MetaQ doesn't have any rpc hook
final ConsumeFromWhere fromWhere = clientConfig.getConsumeFromWhere();
switch (fromWhere) {
case CONSUME_FROM_LAST_OFFSET:
offset = this.rocketmqPullConsumer.maxOffset(remoteQueue);
break;
case CONSUME_FROM_FIRST_OFFSET:
offset = this.rocketmqPullConsumer.minOffset(remoteQueue);
break;
case CONSUME_FROM_TIMESTAMP:
offset = this.rocketmqPullConsumer.searchOffset(remoteQueue, clientConfig.getConsumeTimestamp());
break;
}
}
if (offset >= 0) {
// Got an offset from offset store
return offset;
}
// Maybe wrong ConsumeFromWhere configured, so couldn't find any offset
return null;
} catch (Exception e) {
outerException.set(new RuntimeException(e));
return null;
}
});
if (outerException.get() != null) {
throw outerException.get();
}
if (existsOffset == null) {
final String errorMsg = "[BUG] No offset found in offset store or pullOffsetTable without any exception";
log.warn(errorMsg);
throw new RuntimeException(errorMsg);
}
return existsOffset;
}
void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
pullOffsetTable.put(remoteQueue, nextPullOffset);
}
void removePullOffset(MessageQueue remoteQueue) {
pullOffsetTable.remove(remoteQueue);
}
void shrinkPullOffsetTable(Set<MessageQueue> mqDivided) {
pullOffsetTable.entrySet().removeIf(next -> !mqDivided.contains(next.getKey()));
}
boolean submitConsumeRequest(ConsumeRequest consumeRequest, Long timeout) {
try {
return consumeRequestCache.offer(consumeRequest, timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("put consumeRequestCache failed", e);
}
return false;
}
public List<MessageExt> poll(final int pullBatchSize, final Duration timeout) {
List<MessageExt> messageList = new ArrayList<>();
try {
List<ConsumeRequest> consumeRequestList = new ArrayList<>();
consumeRequestCache.drainTo(consumeRequestList, pullBatchSize);
if (consumeRequestList.size() == 0) {
final ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (consumeRequest != null) {
consumeRequestList.add(consumeRequest);
// drainTo again
consumeRequestCache.drainTo(consumeRequestList, pullBatchSize - 1);
}
}
for (final ConsumeRequest consumeRequest : consumeRequestList) {
MessageExt messageExt = consumeRequest.getMessageExt();
consumedRequest.put(messageExt.getMsgId(), consumeRequest);
messageList.add(messageExt);
}
} catch (InterruptedException e) {
log.warn("Poll from local cache interrupted.", e);
}
return messageList;
}
public void commit(final List<String> messageList) {
for (final String msgId : messageList) {
ConsumeRequest consumeRequest = consumedRequest.remove(msgId);
if (consumeRequest != null) {
long offset = consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt()));
try {
rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
} catch (MQClientException e) {
log.error("A error occurred in update consume offset process.", e);
}
}
}
if (clientConfig.isCommitSync()) {
rocketmqPullConsumer.getDefaultMQPullConsumerImpl().persistConsumerOffset();
}
}
}