| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.rocketmq.store; |
| |
| import com.google.common.hash.Hashing; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.net.Inet6Address; |
| import java.net.InetSocketAddress; |
| import java.net.SocketAddress; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileLock; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicLong; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.rocketmq.common.AbstractBrokerRunnable; |
| import org.apache.rocketmq.common.BrokerConfig; |
| import org.apache.rocketmq.common.BrokerIdentity; |
| import org.apache.rocketmq.common.MixAll; |
| import org.apache.rocketmq.common.ServiceThread; |
| import org.apache.rocketmq.common.SystemClock; |
| import org.apache.rocketmq.common.ThreadFactoryImpl; |
| import org.apache.rocketmq.common.TopicConfig; |
| import org.apache.rocketmq.common.UtilAll; |
| import org.apache.rocketmq.common.attribute.CQType; |
| import org.apache.rocketmq.common.constant.LoggerName; |
| import org.apache.rocketmq.common.message.MessageConst; |
| import org.apache.rocketmq.common.message.MessageDecoder; |
| import org.apache.rocketmq.common.message.MessageExt; |
| import org.apache.rocketmq.common.message.MessageExtBatch; |
| import org.apache.rocketmq.common.message.MessageExtBrokerInner; |
| import org.apache.rocketmq.common.protocol.body.HARuntimeInfo; |
| import org.apache.rocketmq.common.running.RunningStats; |
| import org.apache.rocketmq.common.sysflag.MessageSysFlag; |
| import org.apache.rocketmq.common.topic.TopicValidator; |
| import org.apache.rocketmq.common.utils.QueueTypeUtils; |
| import org.apache.rocketmq.common.utils.ServiceProvider; |
| import org.apache.rocketmq.logging.InternalLogger; |
| import org.apache.rocketmq.logging.InternalLoggerFactory; |
| import org.apache.rocketmq.store.config.BrokerRole; |
| import org.apache.rocketmq.store.config.FlushDiskType; |
| import org.apache.rocketmq.store.config.MessageStoreConfig; |
| import org.apache.rocketmq.store.config.StorePathConfigHelper; |
| import org.apache.rocketmq.store.dledger.DLedgerCommitLog; |
| import org.apache.rocketmq.store.ha.DefaultHAService; |
| import org.apache.rocketmq.store.ha.HAService; |
| import org.apache.rocketmq.store.hook.PutMessageHook; |
| import org.apache.rocketmq.store.hook.SendMessageBackHook; |
| import org.apache.rocketmq.store.index.IndexService; |
| import org.apache.rocketmq.store.index.QueryOffsetResult; |
| import org.apache.rocketmq.store.logfile.MappedFile; |
| import org.apache.rocketmq.store.queue.ConsumeQueueInterface; |
| import org.apache.rocketmq.store.queue.ConsumeQueueStore; |
| import org.apache.rocketmq.store.queue.CqUnit; |
| import org.apache.rocketmq.store.queue.ReferredIterator; |
| import org.apache.rocketmq.store.stats.BrokerStatsManager; |
| import org.apache.rocketmq.store.util.PerfCounter; |
| |
| public class DefaultMessageStore implements MessageStore { |
| private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); |
| |
| public final PerfCounter.Ticks perfs = new PerfCounter.Ticks(LOGGER); |
| |
| private final MessageStoreConfig messageStoreConfig; |
| // CommitLog |
| private final CommitLog commitLog; |
| |
| private final ConsumeQueueStore consumeQueueStore; |
| |
| private final FlushConsumeQueueService flushConsumeQueueService; |
| |
| private final CleanCommitLogService cleanCommitLogService; |
| |
| private final CleanConsumeQueueService cleanConsumeQueueService; |
| |
| private final CorrectLogicOffsetService correctLogicOffsetService; |
| |
| private final IndexService indexService; |
| |
| private final AllocateMappedFileService allocateMappedFileService; |
| |
| private ReputMessageService reputMessageService; |
| |
| private HAService haService; |
| |
| private final StoreStatsService storeStatsService; |
| |
| private final TransientStorePool transientStorePool; |
| |
| private final RunningFlags runningFlags = new RunningFlags(); |
| private final SystemClock systemClock = new SystemClock(); |
| |
| private final ScheduledExecutorService scheduledExecutorService; |
| private final BrokerStatsManager brokerStatsManager; |
| private final MessageArrivingListener messageArrivingListener; |
| private final BrokerConfig brokerConfig; |
| |
| private volatile boolean shutdown = true; |
| |
| private StoreCheckpoint storeCheckpoint; |
| |
| private AtomicLong printTimes = new AtomicLong(0); |
| |
| private final LinkedList<CommitLogDispatcher> dispatcherList; |
| |
| private RandomAccessFile lockFile; |
| |
| private FileLock lock; |
| |
| boolean shutDownNormal = false; |
| // Max pull msg size |
| private final static int MAX_PULL_MSG_SIZE = 128 * 1024 * 1024; |
| |
| private volatile int aliveReplicasNum = 1; |
| |
| // Refer the MessageStore of MasterBroker in the same process. |
| // If current broker is master, this reference point to null or itself. |
| // If current broker is slave, this reference point to the store of master broker, and the two stores belong to |
| // different broker groups. |
| private MessageStore masterStoreInProcess = null; |
| |
| private volatile long masterFlushedOffset = -1L; |
| |
| private volatile long brokerInitMaxOffset = -1L; |
| |
| protected List<PutMessageHook> putMessageHookList = new ArrayList<>(); |
| |
| private SendMessageBackHook sendMessageBackHook; |
| |
| private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = |
| new ConcurrentHashMap<Integer, Long>(32); |
| |
| private int maxDelayLevel; |
| |
| public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, |
| final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { |
| this.messageArrivingListener = messageArrivingListener; |
| this.brokerConfig = brokerConfig; |
| this.messageStoreConfig = messageStoreConfig; |
| this.aliveReplicasNum = messageStoreConfig.getTotalReplicas(); |
| this.brokerStatsManager = brokerStatsManager; |
| this.allocateMappedFileService = new AllocateMappedFileService(this); |
| if (messageStoreConfig.isEnableDLegerCommitLog()) { |
| this.commitLog = new DLedgerCommitLog(this); |
| } else { |
| this.commitLog = new CommitLog(this); |
| } |
| this.consumeQueueStore = new ConsumeQueueStore(this, this.messageStoreConfig); |
| |
| this.flushConsumeQueueService = new FlushConsumeQueueService(); |
| this.cleanCommitLogService = new CleanCommitLogService(); |
| this.cleanConsumeQueueService = new CleanConsumeQueueService(); |
| this.correctLogicOffsetService = new CorrectLogicOffsetService(); |
| this.storeStatsService = new StoreStatsService(getBrokerIdentity()); |
| this.indexService = new IndexService(this); |
| if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) { |
| this.haService = ServiceProvider.loadClass(ServiceProvider.HA_SERVICE_ID, HAService.class); |
| if (null == this.haService) { |
| this.haService = new DefaultHAService(); |
| LOGGER.warn("Load default HA Service: {}", DefaultHAService.class.getSimpleName()); |
| } |
| } |
| |
| this.reputMessageService = new ReputMessageService(); |
| |
| this.transientStorePool = new TransientStorePool(messageStoreConfig); |
| |
| this.scheduledExecutorService = |
| Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity())); |
| |
| this.dispatcherList = new LinkedList<>(); |
| this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); |
| this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); |
| |
| File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir())); |
| UtilAll.ensureDirOK(file.getParent()); |
| UtilAll.ensureDirOK(getStorePathPhysic()); |
| UtilAll.ensureDirOK(getStorePathLogic()); |
| lockFile = new RandomAccessFile(file, "rw"); |
| |
| parseDelayLevel(); |
| } |
| |
| public boolean parseDelayLevel() { |
| HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(); |
| timeUnitTable.put("s", 1000L); |
| timeUnitTable.put("m", 1000L * 60); |
| timeUnitTable.put("h", 1000L * 60 * 60); |
| timeUnitTable.put("d", 1000L * 60 * 60 * 24); |
| |
| String levelString = messageStoreConfig.getMessageDelayLevel(); |
| try { |
| String[] levelArray = levelString.split(" "); |
| for (int i = 0; i < levelArray.length; i++) { |
| String value = levelArray[i]; |
| String ch = value.substring(value.length() - 1); |
| Long tu = timeUnitTable.get(ch); |
| |
| int level = i + 1; |
| if (level > this.maxDelayLevel) { |
| this.maxDelayLevel = level; |
| } |
| long num = Long.parseLong(value.substring(0, value.length() - 1)); |
| long delayTimeMillis = tu * num; |
| this.delayLevelTable.put(level, delayTimeMillis); |
| } |
| } catch (Exception e) { |
| LOGGER.error("parseDelayLevel exception", e); |
| LOGGER.info("levelString String = {}", levelString); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| @Override |
| public void truncateDirtyLogicFiles(long phyOffset) { |
| this.consumeQueueStore.truncateDirty(phyOffset); |
| } |
| |
| /** |
| * @throws IOException |
| */ |
| @Override |
| public boolean load() { |
| boolean result = true; |
| |
| try { |
| boolean lastExitOK = !this.isTempFileExist(); |
| LOGGER.info("last shutdown {}, root dir: {}", lastExitOK ? "normally" : "abnormally", messageStoreConfig.getStorePathRootDir()); |
| |
| // load Commit Log |
| result = result && this.commitLog.load(); |
| |
| // load Consume Queue |
| result = result && this.consumeQueueStore.load(); |
| |
| if (result) { |
| this.storeCheckpoint = |
| new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); |
| this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset(); |
| this.indexService.load(lastExitOK); |
| |
| this.recover(lastExitOK); |
| |
| LOGGER.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); |
| } |
| |
| long maxOffset = this.getMaxPhyOffset(); |
| this.setBrokerInitMaxOffset(maxOffset); |
| LOGGER.info("load over, and the max phy offset = {}", maxOffset); |
| } catch (Exception e) { |
| LOGGER.error("load exception", e); |
| result = false; |
| } |
| |
| if (!result) { |
| this.allocateMappedFileService.shutdown(); |
| } |
| |
| return result; |
| } |
| |
| /** |
| * @throws Exception |
| */ |
| @Override |
| public void start() throws Exception { |
| if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) { |
| this.haService.init(this); |
| } |
| |
| if (messageStoreConfig.isTransientStorePoolEnable()) { |
| this.transientStorePool.init(); |
| } |
| |
| this.allocateMappedFileService.start(); |
| |
| this.indexService.start(); |
| |
| lock = lockFile.getChannel().tryLock(0, 1, false); |
| if (lock == null || lock.isShared() || !lock.isValid()) { |
| throw new RuntimeException("Lock failed,MQ already started"); |
| } |
| |
| lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes())); |
| lockFile.getChannel().force(true); |
| |
| if (this.getMessageStoreConfig().isDuplicationEnable()) { |
| this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); |
| } else { |
| // It is [recover]'s responsibility to fully dispatch the commit log data before the max offset of commit log. |
| this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); |
| } |
| this.reputMessageService.start(); |
| |
| // Checking is not necessary, as long as the dLedger's implementation exactly follows the definition of Recover, |
| // which is eliminating the dispatch inconsistency between the commitLog and consumeQueue at the end of recovery. |
| this.doRecheckReputOffsetFromCq(); |
| |
| this.flushConsumeQueueService.start(); |
| this.commitLog.start(); |
| this.storeStatsService.start(); |
| |
| if (this.haService != null) { |
| this.haService.start(); |
| } |
| |
| this.createTempFile(); |
| this.addScheduleTask(); |
| this.perfs.start(); |
| this.shutdown = false; |
| } |
| |
| private void doRecheckReputOffsetFromCq() throws InterruptedException { |
| if (!messageStoreConfig.isRecheckReputOffsetFromCq()) { |
| return; |
| } |
| |
| /** |
| * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog; |
| * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go; |
| * 3. Calculate the reput offset according to the consume queue; |
| * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed. |
| */ |
| long maxPhysicalPosInLogicQueue = commitLog.getMinOffset(); |
| for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.getConsumeQueueTable().values()) { |
| for (ConsumeQueueInterface logic : maps.values()) { |
| if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) { |
| maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset(); |
| } |
| } |
| } |
| // If maxPhyPos(CQs) < minPhyPos(CommitLog), some newly deleted topics may be re-dispatched into cqs mistakenly. |
| if (maxPhysicalPosInLogicQueue < 0) { |
| maxPhysicalPosInLogicQueue = 0; |
| } |
| if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) { |
| maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset(); |
| /** |
| * This happens in following conditions: |
| * 1. If someone removes all the consumequeue files or the disk get damaged. |
| * 2. Launch a new broker, and copy the commitlog from other brokers. |
| * |
| * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0. |
| * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong. |
| */ |
| LOGGER.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset()); |
| } |
| LOGGER.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}", |
| maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset()); |
| this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue); |
| |
| /** |
| * 1. Finish dispatching the messages fall behind, then to start other services. |
| * 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0 |
| */ |
| while (true) { |
| if (dispatchBehindBytes() <= 0) { |
| break; |
| } |
| Thread.sleep(1000); |
| LOGGER.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes()); |
| } |
| this.recoverTopicQueueTable(); |
| } |
| |
| @Override |
| public void shutdown() { |
| if (!this.shutdown) { |
| this.shutdown = true; |
| |
| this.scheduledExecutorService.shutdown(); |
| try { |
| |
| Thread.sleep(1000 * 3); |
| } catch (InterruptedException e) { |
| LOGGER.error("shutdown Exception, ", e); |
| } |
| |
| if (this.haService != null) { |
| this.haService.shutdown(); |
| } |
| |
| this.storeStatsService.shutdown(); |
| this.indexService.shutdown(); |
| this.commitLog.shutdown(); |
| this.reputMessageService.shutdown(); |
| this.flushConsumeQueueService.shutdown(); |
| this.allocateMappedFileService.shutdown(); |
| this.storeCheckpoint.flush(); |
| this.storeCheckpoint.shutdown(); |
| |
| this.perfs.shutdown(); |
| |
| if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) { |
| this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir())); |
| shutDownNormal = true; |
| } else { |
| LOGGER.warn("the store may be wrong, so shutdown abnormally, and keep abort file."); |
| } |
| } |
| |
| this.transientStorePool.destroy(); |
| |
| if (lockFile != null && lock != null) { |
| try { |
| lock.release(); |
| lockFile.close(); |
| } catch (IOException e) { |
| } |
| } |
| } |
| |
| @Override |
| public void destroy() { |
| this.destroyLogics(); |
| this.commitLog.destroy(); |
| this.indexService.destroy(); |
| this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir())); |
| this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); |
| } |
| |
| public long getMajorFileSize() { |
| long commitLogSize = 0; |
| if (this.commitLog != null) { |
| commitLogSize = this.commitLog.getTotalSize(); |
| } |
| |
| long consumeQueueSize = 0; |
| if (this.consumeQueueStore != null) { |
| consumeQueueSize = this.consumeQueueStore.getTotalSize(); |
| } |
| |
| long indexFileSize = 0; |
| if (this.indexService != null) { |
| indexFileSize = this.indexService.getTotalSize(); |
| } |
| |
| return commitLogSize + consumeQueueSize + indexFileSize; |
| } |
| |
| @Override |
| public void destroyLogics() { |
| this.consumeQueueStore.destroy(); |
| } |
| |
| @Override |
| public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) { |
| |
| for (PutMessageHook putMessageHook : putMessageHookList) { |
| PutMessageResult handleResult = putMessageHook.executeBeforePutMessage(msg); |
| if (handleResult != null) { |
| return CompletableFuture.completedFuture(handleResult); |
| } |
| } |
| |
| if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM) |
| && !MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) { |
| LOGGER.warn("[BUG]The message had property {} but is not an inner batch", MessageConst.PROPERTY_INNER_NUM); |
| return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); |
| } |
| |
| if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) { |
| Optional<TopicConfig> topicConfig = this.getTopicConfig(msg.getTopic()); |
| if (!QueueTypeUtils.isBatchCq(topicConfig)) { |
| LOGGER.error("[BUG]The message is an inner batch but cq type is not batch cq"); |
| return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); |
| } |
| } |
| |
| long beginTime = this.getSystemClock().now(); |
| CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg); |
| |
| putResultFuture.thenAccept(result -> { |
| long elapsedTime = this.getSystemClock().now() - beginTime; |
| if (elapsedTime > 500) { |
| LOGGER.warn("DefaultMessageStore#putMessage: CommitLog#putMessage cost {}ms, topic={}, bodyLength={}", |
| msg.getTopic(), msg.getBody().length); |
| } |
| this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); |
| |
| if (null == result || !result.isOk()) { |
| this.storeStatsService.getPutMessageFailedTimes().add(1); |
| } |
| }); |
| |
| return putResultFuture; |
| } |
| |
| @Override |
| public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) { |
| |
| for (PutMessageHook putMessageHook : putMessageHookList) { |
| PutMessageResult handleResult = putMessageHook.executeBeforePutMessage(messageExtBatch); |
| if (handleResult != null) { |
| return CompletableFuture.completedFuture(handleResult); |
| } |
| } |
| |
| long beginTime = this.getSystemClock().now(); |
| CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessages(messageExtBatch); |
| |
| putResultFuture.thenAccept(result -> { |
| long eclipseTime = this.getSystemClock().now() - beginTime; |
| if (eclipseTime > 500) { |
| LOGGER.warn("not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, messageExtBatch.getBody().length); |
| } |
| this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); |
| |
| if (null == result || !result.isOk()) { |
| this.storeStatsService.getPutMessageFailedTimes().add(1); |
| } |
| }); |
| |
| return putResultFuture; |
| } |
| |
| @Override |
| public PutMessageResult putMessage(MessageExtBrokerInner msg) { |
| return waitForPutResult(asyncPutMessage(msg)); |
| } |
| |
| @Override |
| public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { |
| return waitForPutResult(asyncPutMessages(messageExtBatch)); |
| } |
| |
| private PutMessageResult waitForPutResult(CompletableFuture<PutMessageResult> putMessageResultFuture) { |
| try { |
| int putMessageTimeout = |
| Math.max(this.messageStoreConfig.getSyncFlushTimeout(), |
| this.messageStoreConfig.getSlaveTimeout()) + 5000; |
| return putMessageResultFuture.get(putMessageTimeout, TimeUnit.MILLISECONDS); |
| } catch (ExecutionException | InterruptedException e) { |
| return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null); |
| } catch (TimeoutException e) { |
| LOGGER.error("usually it will never timeout, putMessageTimeout is much bigger than slaveTimeout and " |
| + "flushTimeout so the result can be got anyway, but in some situations timeout will happen like full gc " |
| + "process hangs or other unexpected situations."); |
| return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null); |
| } |
| } |
| |
| @Override |
| public boolean isOSPageCacheBusy() { |
| long begin = this.getCommitLog().getBeginTimeInLock(); |
| long diff = this.systemClock.now() - begin; |
| |
| return diff < 10000000 |
| && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills(); |
| } |
| |
| @Override |
| public long lockTimeMills() { |
| return this.commitLog.lockTimeMills(); |
| } |
| |
| @Override |
| public long getMasterFlushedOffset() { |
| return this.masterFlushedOffset; |
| } |
| |
| @Override |
| public void setMasterFlushedOffset(long masterFlushedOffset) { |
| this.masterFlushedOffset = masterFlushedOffset; |
| this.storeCheckpoint.setMasterFlushedOffset(masterFlushedOffset); |
| } |
| |
| @Override |
| public long getBrokerInitMaxOffset() { |
| return this.brokerInitMaxOffset; |
| } |
| |
| @Override |
| public void setBrokerInitMaxOffset(long brokerInitMaxOffset) { |
| this.brokerInitMaxOffset = brokerInitMaxOffset; |
| } |
| |
| public SystemClock getSystemClock() { |
| return systemClock; |
| } |
| |
| @Override |
| public CommitLog getCommitLog() { |
| return commitLog; |
| } |
| |
| public void truncateDirtyFiles(long offsetToTruncate) { |
| if (offsetToTruncate >= this.getMaxPhyOffset()) { |
| return; |
| } |
| |
| this.reputMessageService.shutdown(); |
| |
| // truncate commitLog |
| this.commitLog.truncateDirtyFiles(offsetToTruncate); |
| |
| // truncate consume queue |
| this.truncateDirtyLogicFiles(offsetToTruncate); |
| |
| recoverTopicQueueTable(); |
| |
| this.reputMessageService = new ReputMessageService(); |
| this.reputMessageService.setReputFromOffset(offsetToTruncate); |
| this.reputMessageService.start(); |
| } |
| |
| @Override |
| public boolean truncateFiles(long offsetToTruncate) { |
| if (offsetToTruncate >= this.getMaxPhyOffset()) { |
| return true; |
| } |
| |
| if (!isOffsetAligned(offsetToTruncate)) { |
| LOGGER.error("Offset {} not align, truncate failed, need manual fix"); |
| return false; |
| } |
| truncateDirtyFiles(offsetToTruncate); |
| return true; |
| } |
| |
| @Override |
| public boolean isOffsetAligned(long offset) { |
| SelectMappedBufferResult mappedBufferResult = this.getCommitLogData(offset); |
| |
| if (mappedBufferResult == null) { |
| return true; |
| } |
| |
| DispatchRequest dispatchRequest = this.commitLog.checkMessageAndReturnSize(mappedBufferResult.getByteBuffer(), true, false); |
| return dispatchRequest.isSuccess(); |
| } |
| |
| @Override |
| public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, |
| final int maxMsgNums, |
| final MessageFilter messageFilter) { |
| return getMessage(group, topic, queueId, offset, maxMsgNums, MAX_PULL_MSG_SIZE, messageFilter); |
| } |
| |
| @Override |
| public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, |
| final int maxMsgNums, |
| final int maxTotalMsgSize, |
| final MessageFilter messageFilter) { |
| if (this.shutdown) { |
| LOGGER.warn("message store has shutdown, so getMessage is forbidden"); |
| return null; |
| } |
| |
| if (!this.runningFlags.isReadable()) { |
| LOGGER.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits()); |
| return null; |
| } |
| |
| long beginTime = this.getSystemClock().now(); |
| |
| GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; |
| long nextBeginOffset = offset; |
| long minOffset = 0; |
| long maxOffset = 0; |
| |
| GetMessageResult getResult = new GetMessageResult(); |
| |
| final long maxOffsetPy = this.commitLog.getMaxOffset(); |
| |
| ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId); |
| if (consumeQueue != null) { |
| minOffset = consumeQueue.getMinOffsetInQueue(); |
| maxOffset = consumeQueue.getMaxOffsetInQueue(); |
| |
| if (maxOffset == 0) { |
| status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; |
| nextBeginOffset = nextOffsetCorrection(offset, 0); |
| } else if (offset < minOffset) { |
| status = GetMessageStatus.OFFSET_TOO_SMALL; |
| nextBeginOffset = nextOffsetCorrection(offset, minOffset); |
| } else if (offset == maxOffset) { |
| status = GetMessageStatus.OFFSET_OVERFLOW_ONE; |
| nextBeginOffset = nextOffsetCorrection(offset, offset); |
| } else if (offset > maxOffset) { |
| status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; |
| nextBeginOffset = nextOffsetCorrection(offset, maxOffset); |
| } else { |
| final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); |
| final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); |
| |
| long maxPullSize = Math.max(maxTotalMsgSize, 100); |
| if (maxPullSize > MAX_PULL_MSG_SIZE) { |
| LOGGER.warn("The max pull size is too large maxPullSize={} topic={} queueId={}", maxPullSize, topic, queueId); |
| maxPullSize = MAX_PULL_MSG_SIZE; |
| } |
| status = GetMessageStatus.NO_MATCHED_MESSAGE; |
| long maxPhyOffsetPulling = 0; |
| int cqFileNum = 0; |
| |
| while (getResult.getBufferTotalSize() <= 0 |
| && nextBeginOffset < maxOffset |
| && cqFileNum++ < this.messageStoreConfig.getTravelCqFileNumWhenGetMessage()) { |
| ReferredIterator<CqUnit> bufferConsumeQueue = consumeQueue.iterateFrom(nextBeginOffset); |
| |
| if (bufferConsumeQueue == null) { |
| status = GetMessageStatus.OFFSET_FOUND_NULL; |
| nextBeginOffset = nextOffsetCorrection(nextBeginOffset, this.consumeQueueStore.rollNextFile(consumeQueue, nextBeginOffset)); |
| LOGGER.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " |
| + maxOffset + ", but access logic queue failed. Correct nextBeginOffset to " + nextBeginOffset); |
| break; |
| } |
| |
| try { |
| long nextPhyFileStartOffset = Long.MIN_VALUE; |
| while (bufferConsumeQueue.hasNext() |
| && nextBeginOffset < maxOffset) { |
| CqUnit cqUnit = bufferConsumeQueue.next(); |
| long offsetPy = cqUnit.getPos(); |
| int sizePy = cqUnit.getSize(); |
| |
| boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); |
| |
| if (cqUnit.getQueueOffset() - offset > maxFilterMessageCount) { |
| break; |
| } |
| |
| if (this.isTheBatchFull(sizePy, cqUnit.getBatchNum(), maxMsgNums, maxPullSize, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { |
| break; |
| } |
| |
| if (getResult.getBufferTotalSize() >= maxPullSize) { |
| break; |
| } |
| |
| maxPhyOffsetPulling = offsetPy; |
| |
| //Be careful, here should before the isTheBatchFull |
| nextBeginOffset = cqUnit.getQueueOffset() + cqUnit.getBatchNum(); |
| |
| if (nextPhyFileStartOffset != Long.MIN_VALUE) { |
| if (offsetPy < nextPhyFileStartOffset) { |
| continue; |
| } |
| } |
| |
| if (messageFilter != null |
| && !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) { |
| if (getResult.getBufferTotalSize() == 0) { |
| status = GetMessageStatus.NO_MATCHED_MESSAGE; |
| } |
| |
| continue; |
| } |
| |
| SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); |
| if (null == selectResult) { |
| if (getResult.getBufferTotalSize() == 0) { |
| status = GetMessageStatus.MESSAGE_WAS_REMOVING; |
| } |
| |
| nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); |
| continue; |
| } |
| |
| if (messageFilter != null |
| && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { |
| if (getResult.getBufferTotalSize() == 0) { |
| status = GetMessageStatus.NO_MATCHED_MESSAGE; |
| } |
| // release... |
| selectResult.release(); |
| continue; |
| } |
| |
| this.storeStatsService.getGetMessageTransferedMsgCount().add(cqUnit.getBatchNum()); |
| getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum()); |
| status = GetMessageStatus.FOUND; |
| nextPhyFileStartOffset = Long.MIN_VALUE; |
| } |
| } finally { |
| bufferConsumeQueue.release(); |
| } |
| } |
| |
| if (diskFallRecorded) { |
| long fallBehind = maxOffsetPy - maxPhyOffsetPulling; |
| brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); |
| } |
| |
| long diff = maxOffsetPy - maxPhyOffsetPulling; |
| long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE |
| * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); |
| getResult.setSuggestPullingFromSlave(diff > memory); |
| } |
| } else { |
| status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; |
| nextBeginOffset = nextOffsetCorrection(offset, 0); |
| } |
| |
| if (GetMessageStatus.FOUND == status) { |
| this.storeStatsService.getGetMessageTimesTotalFound().add(1); |
| } else { |
| this.storeStatsService.getGetMessageTimesTotalMiss().add(1); |
| } |
| long elapsedTime = this.getSystemClock().now() - beginTime; |
| this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime); |
| |
| // lazy init no data found. |
| if (getResult == null) { |
| getResult = new GetMessageResult(0); |
| } |
| |
| getResult.setStatus(status); |
| getResult.setNextBeginOffset(nextBeginOffset); |
| getResult.setMaxOffset(maxOffset); |
| getResult.setMinOffset(minOffset); |
| return getResult; |
| } |
| |
| @Override |
| public long getMaxOffsetInQueue(String topic, int queueId) { |
| return getMaxOffsetInQueue(topic, queueId, true); |
| } |
| |
| @Override |
| public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) { |
| if (committed) { |
| ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId); |
| if (logic != null) { |
| return logic.getMaxOffsetInQueue(); |
| } |
| } else { |
| Long offset = this.consumeQueueStore.getMaxOffset(topic, queueId); |
| if (offset != null) { |
| return offset; |
| } |
| } |
| |
| return 0; |
| } |
| |
| @Override |
| public long getMinOffsetInQueue(String topic, int queueId) { |
| ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId); |
| if (logic != null) { |
| return logic.getMinOffsetInQueue(); |
| } |
| |
| return -1; |
| } |
| |
| @Override |
| public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) { |
| ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId); |
| if (consumeQueue != null) { |
| |
| ReferredIterator<CqUnit> bufferConsumeQueue = consumeQueue.iterateFrom(consumeQueueOffset); |
| if (bufferConsumeQueue != null) { |
| try { |
| if (bufferConsumeQueue.hasNext()) { |
| long offsetPy = bufferConsumeQueue.next().getPos(); |
| return offsetPy; |
| } |
| } finally { |
| bufferConsumeQueue.release(); |
| } |
| } |
| } |
| |
| return 0; |
| } |
| |
| @Override |
| public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { |
| ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId); |
| if (logic != null) { |
| long resultOffset = logic.getOffsetInQueueByTime(timestamp); |
| // Make sure the result offset is in valid range. |
| resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue()); |
| resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue()); |
| return resultOffset; |
| } |
| |
| return 0; |
| } |
| |
| @Override |
| public MessageExt lookMessageByOffset(long commitLogOffset) { |
| SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4); |
| if (null != sbr) { |
| try { |
| // 1 TOTALSIZE |
| int size = sbr.getByteBuffer().getInt(); |
| return lookMessageByOffset(commitLogOffset, size); |
| } finally { |
| sbr.release(); |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) { |
| SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4); |
| if (null != sbr) { |
| try { |
| // 1 TOTALSIZE |
| int size = sbr.getByteBuffer().getInt(); |
| return this.commitLog.getMessage(commitLogOffset, size); |
| } finally { |
| sbr.release(); |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) { |
| return this.commitLog.getMessage(commitLogOffset, msgSize); |
| } |
| |
| @Override |
| public String getRunningDataInfo() { |
| return this.storeStatsService.toString(); |
| } |
| |
| public String getStorePathPhysic() { |
| String storePathPhysic; |
| if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog()) { |
| storePathPhysic = ((DLedgerCommitLog) DefaultMessageStore.this.getCommitLog()).getdLedgerServer().getdLedgerConfig().getDataStorePath(); |
| } else { |
| storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); |
| } |
| return storePathPhysic; |
| } |
| |
| public String getStorePathLogic() { |
| return StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()); |
| } |
| |
| @Override |
| public HashMap<String, String> getRuntimeInfo() { |
| HashMap<String, String> result = this.storeStatsService.getRuntimeInfo(); |
| |
| { |
| double minPhysicsUsedRatio = Double.MAX_VALUE; |
| String commitLogStorePath = getStorePathPhysic(); |
| String[] paths = commitLogStorePath.trim().split(MixAll.MULTI_PATH_SPLITTER); |
| for (String clPath : paths) { |
| double physicRatio = UtilAll.isPathExists(clPath) ? |
| UtilAll.getDiskPartitionSpaceUsedPercent(clPath) : -1; |
| result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio)); |
| minPhysicsUsedRatio = Math.min(minPhysicsUsedRatio, physicRatio); |
| } |
| result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(minPhysicsUsedRatio)); |
| } |
| |
| { |
| double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathLogic()); |
| result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio)); |
| } |
| |
| result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset())); |
| result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset())); |
| |
| return result; |
| } |
| |
| @Override |
| public long getMaxPhyOffset() { |
| return this.commitLog.getMaxOffset(); |
| } |
| |
| @Override |
| public long getMinPhyOffset() { |
| return this.commitLog.getMinOffset(); |
| } |
| |
| @Override |
| public long getLastFileFromOffset() { |
| return this.commitLog.getLastFileFromOffset(); |
| } |
| |
| @Override |
| public boolean getLastMappedFile(long startOffset) { |
| return this.commitLog.getLastMappedFile(startOffset); |
| } |
| |
| @Override |
| public long getEarliestMessageTime(String topic, int queueId) { |
| ConsumeQueueInterface logicQueue = this.findConsumeQueue(topic, queueId); |
| if (logicQueue != null) { |
| return getStoreTime(logicQueue.getEarliestUnit()); |
| } |
| |
| return -1; |
| } |
| |
| protected long getStoreTime(CqUnit result) { |
| if (result != null) { |
| try { |
| final long phyOffset = result.getPos(); |
| final int size = result.getSize(); |
| long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size); |
| return storeTime; |
| } catch (Exception e) { |
| } |
| } |
| return -1; |
| } |
| |
| @Override |
| public long getEarliestMessageTime() { |
| final long minPhyOffset = this.getMinPhyOffset(); |
| final int size = this.messageStoreConfig.getMaxMessageSize() * 2; |
| return this.getCommitLog().pickupStoreTimestamp(minPhyOffset, size); |
| } |
| |
| @Override |
| public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) { |
| ConsumeQueueInterface logicQueue = this.findConsumeQueue(topic, queueId); |
| if (logicQueue != null) { |
| return getStoreTime(logicQueue.get(consumeQueueOffset)); |
| } |
| |
| return -1; |
| } |
| |
| @Override |
| public long getMessageTotalInQueue(String topic, int queueId) { |
| ConsumeQueueInterface logicQueue = this.findConsumeQueue(topic, queueId); |
| if (logicQueue != null) { |
| return logicQueue.getMessageTotalInQueue(); |
| } |
| |
| return -1; |
| } |
| |
| @Override |
| public SelectMappedBufferResult getCommitLogData(final long offset) { |
| if (this.shutdown) { |
| LOGGER.warn("message store has shutdown, so getPhyQueueData is forbidden"); |
| return null; |
| } |
| |
| return this.commitLog.getData(offset); |
| } |
| |
| @Override |
| public List<SelectMappedBufferResult> getBulkCommitLogData(final long offset, final int size) { |
| if (this.shutdown) { |
| LOGGER.warn("message store has shutdown, so getBulkCommitLogData is forbidden"); |
| return null; |
| } |
| |
| return this.commitLog.getBulkData(offset, size); |
| } |
| |
| @Override |
| public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) { |
| if (this.shutdown) { |
| LOGGER.warn("message store has shutdown, so appendToCommitLog is forbidden"); |
| return false; |
| } |
| |
| boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength); |
| if (result) { |
| this.reputMessageService.wakeup(); |
| } else { |
| LOGGER.error( |
| "DefaultMessageStore#appendToCommitLog: failed to append data to commitLog, physical offset={}, data " |
| + "length={}", startOffset, data.length); |
| } |
| |
| return result; |
| } |
| |
| @Override |
| public void executeDeleteFilesManually() { |
| this.cleanCommitLogService.executeDeleteFilesManually(); |
| } |
| |
| @Override |
| public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) { |
| QueryMessageResult queryMessageResult = new QueryMessageResult(); |
| |
| long lastQueryMsgTime = end; |
| |
| for (int i = 0; i < 3; i++) { |
| QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime); |
| if (queryOffsetResult.getPhyOffsets().isEmpty()) { |
| break; |
| } |
| |
| Collections.sort(queryOffsetResult.getPhyOffsets()); |
| |
| queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset()); |
| queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp()); |
| |
| for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) { |
| long offset = queryOffsetResult.getPhyOffsets().get(m); |
| |
| try { |
| |
| boolean match = true; |
| MessageExt msg = this.lookMessageByOffset(offset); |
| if (0 == m) { |
| lastQueryMsgTime = msg.getStoreTimestamp(); |
| } |
| |
| // String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR); |
| // if (topic.equals(msg.getTopic())) { |
| // for (String k : keyArray) { |
| // if (k.equals(key)) { |
| // match = true; |
| // break; |
| // } |
| // } |
| // } |
| |
| if (match) { |
| SelectMappedBufferResult result = this.commitLog.getData(offset, false); |
| if (result != null) { |
| int size = result.getByteBuffer().getInt(0); |
| result.getByteBuffer().limit(size); |
| result.setSize(size); |
| queryMessageResult.addMessage(result); |
| } |
| } else { |
| LOGGER.warn("queryMessage hash duplicate, topic={}, key={}", topic, key); |
| } |
| } catch (Exception e) { |
| LOGGER.error("queryMessage exception", e); |
| } |
| } |
| |
| if (queryMessageResult.getBufferTotalSize() > 0) { |
| break; |
| } |
| |
| if (lastQueryMsgTime < begin) { |
| break; |
| } |
| } |
| |
| return queryMessageResult; |
| } |
| |
| @Override |
| public void updateHaMasterAddress(String newAddr) { |
| if (this.haService != null) { |
| this.haService.updateHaMasterAddress(newAddr); |
| } |
| } |
| |
| @Override |
| public void updateMasterAddress(String newAddr) { |
| if (this.haService != null) { |
| this.haService.updateMasterAddress(newAddr); |
| } |
| } |
| |
| @Override |
| public void setAliveReplicaNumInGroup(int aliveReplicaNums) { |
| this.aliveReplicasNum = aliveReplicaNums; |
| } |
| |
| @Override |
| public void wakeupHAClient() { |
| if (this.haService != null) { |
| this.haService.getHAClient().wakeup(); |
| } |
| } |
| |
| @Override |
| public int getAliveReplicaNumInGroup() { |
| return this.aliveReplicasNum; |
| } |
| |
| @Override |
| public long slaveFallBehindMuch() { |
| if (this.haService == null || this.messageStoreConfig.isDuplicationEnable() || this.messageStoreConfig.isEnableDLegerCommitLog()) { |
| LOGGER.warn("haServer is null or duplication is enable or enableDLegerCommitLog is true"); |
| return -1; |
| } else { |
| return this.commitLog.getMaxOffset() - this.haService.getPush2SlaveMaxOffset().get(); |
| } |
| |
| } |
| |
| @Override |
| public long now() { |
| return this.systemClock.now(); |
| } |
| |
| @Override |
| public int cleanUnusedTopic(Set<String> topics) { |
| Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.getConsumeQueueTable().entrySet().iterator(); |
| while (it.hasNext()) { |
| Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next(); |
| String topic = next.getKey(); |
| |
| if (!topics.contains(topic) && !TopicValidator.isSystemTopic(topic) && !MixAll.isLmq(topic)) { |
| ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = next.getValue(); |
| for (ConsumeQueueInterface cq : queueTable.values()) { |
| this.consumeQueueStore.destroy(cq); |
| LOGGER.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", |
| cq.getTopic(), |
| cq.getQueueId() |
| ); |
| |
| this.consumeQueueStore.removeTopicQueueTable(cq.getTopic(), cq.getQueueId()); |
| } |
| it.remove(); |
| |
| if (this.brokerConfig.isAutoDeleteUnusedStats()) { |
| this.brokerStatsManager.onTopicDeleted(topic); |
| } |
| |
| LOGGER.info("cleanUnusedTopic: {},topic destroyed", topic); |
| } |
| } |
| |
| return 0; |
| } |
| |
| @Override |
| public void cleanExpiredConsumerQueue() { |
| long minCommitLogOffset = this.commitLog.getMinOffset(); |
| |
| this.consumeQueueStore.cleanExpired(minCommitLogOffset); |
| } |
| |
| public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset, |
| SocketAddress storeHost) { |
| Map<String, Long> messageIds = new HashMap<String, Long>(); |
| if (this.shutdown) { |
| return messageIds; |
| } |
| |
| ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId); |
| if (consumeQueue != null) { |
| minOffset = Math.max(minOffset, consumeQueue.getMinOffsetInQueue()); |
| maxOffset = Math.min(maxOffset, consumeQueue.getMaxOffsetInQueue()); |
| |
| if (maxOffset == 0) { |
| return messageIds; |
| } |
| |
| long nextOffset = minOffset; |
| while (nextOffset < maxOffset) { |
| ReferredIterator<CqUnit> bufferConsumeQueue = consumeQueue.iterateFrom(nextOffset); |
| try { |
| if (bufferConsumeQueue != null && bufferConsumeQueue.hasNext()) { |
| while (bufferConsumeQueue.hasNext()) { |
| CqUnit cqUnit = bufferConsumeQueue.next(); |
| long offsetPy = cqUnit.getPos(); |
| InetSocketAddress inetSocketAddress = (InetSocketAddress) storeHost; |
| int msgIdLength = (inetSocketAddress.getAddress() instanceof Inet6Address) ? 16 + 4 + 8 : 4 + 4 + 8; |
| final ByteBuffer msgIdMemory = ByteBuffer.allocate(msgIdLength); |
| String msgId = |
| MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy); |
| messageIds.put(msgId, cqUnit.getQueueOffset()); |
| nextOffset = cqUnit.getQueueOffset() + cqUnit.getBatchNum(); |
| if (nextOffset >= maxOffset) { |
| return messageIds; |
| } |
| } |
| } else { |
| return messageIds; |
| } |
| } finally { |
| if (bufferConsumeQueue != null) { |
| bufferConsumeQueue.release(); |
| } |
| } |
| } |
| } |
| return messageIds; |
| } |
| |
| @Override |
| public boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) { |
| |
| final long maxOffsetPy = this.commitLog.getMaxOffset(); |
| |
| ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId); |
| if (consumeQueue != null) { |
| CqUnit cqUnit = consumeQueue.get(consumeOffset); |
| |
| if (cqUnit != null) { |
| long offsetPy = cqUnit.getPos(); |
| return checkInDiskByCommitOffset(offsetPy, maxOffsetPy); |
| } else { |
| return false; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public long dispatchBehindBytes() { |
| return this.reputMessageService.behind(); |
| } |
| |
| @Override |
| public long flush() { |
| return this.commitLog.flush(); |
| } |
| |
| @Override |
| public long getFlushedWhere() { |
| return this.commitLog.getFlushedWhere(); |
| } |
| |
| @Override |
| public boolean resetWriteOffset(long phyOffset) { |
| //copy a new map |
| ConcurrentHashMap<String, Long> newMap = new ConcurrentHashMap<>(consumeQueueStore.getTopicQueueTable()); |
| SelectMappedBufferResult lastBuffer = null; |
| long startReadOffset = phyOffset == -1 ? 0 : phyOffset; |
| while ((lastBuffer = selectOneMessageByOffset(startReadOffset)) != null) { |
| try { |
| if (lastBuffer.getStartOffset() > startReadOffset) { |
| startReadOffset = lastBuffer.getStartOffset(); |
| continue; |
| } |
| |
| ByteBuffer bb = lastBuffer.getByteBuffer(); |
| int magicCode = bb.getInt(bb.position() + 4); |
| if (magicCode == CommitLog.BLANK_MAGIC_CODE) { |
| startReadOffset += bb.getInt(bb.position()); |
| continue; |
| } else if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE) { |
| throw new RuntimeException("Unknown magicCode: " + magicCode); |
| } |
| |
| lastBuffer.getByteBuffer().mark(); |
| |
| DispatchRequest dispatchRequest = checkMessageAndReturnSize(lastBuffer.getByteBuffer(), true, messageStoreConfig.isDuplicationEnable(), true); |
| if (!dispatchRequest.isSuccess()) |
| break; |
| |
| lastBuffer.getByteBuffer().reset(); |
| |
| MessageExt msg = MessageDecoder.decode(lastBuffer.getByteBuffer(), true, false, false, false, true); |
| if (msg == null) { |
| break; |
| } |
| String key = msg.getTopic() + "-" + msg.getQueueId(); |
| Long cur = newMap.get(key); |
| if (cur != null && cur > msg.getQueueOffset()) { |
| newMap.put(key, msg.getQueueOffset()); |
| } |
| startReadOffset += msg.getStoreSize(); |
| } catch (Throwable e) { |
| LOGGER.error("resetWriteOffset error.", e); |
| } finally { |
| if (lastBuffer != null) |
| lastBuffer.release(); |
| } |
| } |
| if (this.commitLog.resetOffset(phyOffset)) { |
| this.consumeQueueStore.setTopicQueueTable(newMap); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| @Override |
| public long getConfirmOffset() { |
| return this.commitLog.getConfirmOffset(); |
| } |
| |
| @Override |
| public void setConfirmOffset(long phyOffset) { |
| this.commitLog.setConfirmOffset(phyOffset); |
| } |
| |
| @Override |
| public byte[] calcDeltaChecksum(long from, long to) { |
| if (from < 0 || to <= from) { |
| return new byte[0]; |
| } |
| |
| int size = (int) (to - from); |
| |
| if (size > this.messageStoreConfig.getMaxChecksumRange()) { |
| LOGGER.error("Checksum range from {}, size {} exceeds threshold {}", from, size, this.messageStoreConfig.getMaxChecksumRange()); |
| return null; |
| } |
| |
| List<MessageExt> msgList = new ArrayList<>(); |
| List<SelectMappedBufferResult> bufferResultList = this.getBulkCommitLogData(from, size); |
| if (bufferResultList.isEmpty()) { |
| return new byte[0]; |
| } |
| |
| for (SelectMappedBufferResult bufferResult : bufferResultList) { |
| msgList.addAll(MessageDecoder.decodesBatch(bufferResult.getByteBuffer(), true, false, false)); |
| bufferResult.release(); |
| } |
| |
| if (msgList.isEmpty()) { |
| return new byte[0]; |
| } |
| |
| ByteBuffer byteBuffer = ByteBuffer.allocate(size); |
| for (MessageExt msg : msgList) { |
| try { |
| byteBuffer.put(MessageDecoder.encodeUniquely(msg, false)); |
| } catch (IOException ignore) { |
| } |
| } |
| |
| return Hashing.murmur3_128().hashBytes(byteBuffer.array()).asBytes(); |
| } |
| |
| @Override |
| public void setPhysicalOffset(long phyOffset) { |
| this.commitLog.setMappedFileQueueOffset(phyOffset); |
| } |
| |
| @Override |
| public boolean isMappedFilesEmpty() { |
| return this.commitLog.isMappedFilesEmpty(); |
| } |
| |
| @Override |
| public MessageExt lookMessageByOffset(long commitLogOffset, int size) { |
| SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, size); |
| if (null != sbr) { |
| try { |
| return MessageDecoder.decode(sbr.getByteBuffer(), true, false); |
| } finally { |
| sbr.release(); |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public ConsumeQueueInterface findConsumeQueue(String topic, int queueId) { |
| return this.consumeQueueStore.findOrCreateConsumeQueue(topic, queueId); |
| } |
| |
| private long nextOffsetCorrection(long oldOffset, long newOffset) { |
| long nextOffset = oldOffset; |
| if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || |
| this.getMessageStoreConfig().isOffsetCheckInSlave()) { |
| nextOffset = newOffset; |
| } |
| return nextOffset; |
| } |
| |
| private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) { |
| long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); |
| return (maxOffsetPy - offsetPy) > memory; |
| } |
| |
| private boolean isTheBatchFull(int sizePy, int unitBatchNum, int maxMsgNums, long maxMsgSize, int bufferTotal, |
| int messageTotal, boolean isInDisk) { |
| |
| if (0 == bufferTotal || 0 == messageTotal) { |
| return false; |
| } |
| |
| if (messageTotal + unitBatchNum > maxMsgNums) { |
| return true; |
| } |
| |
| if (bufferTotal + sizePy > maxMsgSize) { |
| return true; |
| } |
| |
| if (isInDisk) { |
| if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) { |
| return true; |
| } |
| |
| if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) { |
| return true; |
| } |
| } else { |
| if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) { |
| return true; |
| } |
| |
| if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| private void deleteFile(final String fileName) { |
| File file = new File(fileName); |
| boolean result = file.delete(); |
| LOGGER.info(fileName + (result ? " delete OK" : " delete Failed")); |
| } |
| |
| /** |
| * @throws IOException |
| */ |
| private void createTempFile() throws IOException { |
| String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()); |
| File file = new File(fileName); |
| UtilAll.ensureDirOK(file.getParent()); |
| boolean result = file.createNewFile(); |
| LOGGER.info(fileName + (result ? " create OK" : " already exists")); |
| MixAll.string2File(Long.toString(MixAll.getPID()), file.getAbsolutePath()); |
| } |
| |
| private void addScheduleTask() { |
| |
| this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { |
| @Override |
| public void run2() { |
| DefaultMessageStore.this.cleanFilesPeriodically(); |
| } |
| }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); |
| |
| this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { |
| @Override |
| public void run2() { |
| DefaultMessageStore.this.checkSelf(); |
| } |
| }, 1, 10, TimeUnit.MINUTES); |
| |
| this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { |
| @Override |
| public void run2() { |
| if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) { |
| try { |
| if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) { |
| long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock(); |
| if (lockTime > 1000 && lockTime < 10000000) { |
| |
| String stack = UtilAll.jstack(); |
| final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" |
| + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime; |
| MixAll.string2FileNotSafe(stack, fileName); |
| } |
| } |
| } catch (Exception e) { |
| } |
| } |
| } |
| }, 1, 1, TimeUnit.SECONDS); |
| |
| this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { |
| @Override |
| public void run2() { |
| DefaultMessageStore.this.storeCheckpoint.flush(); |
| } |
| }, 1, 1, TimeUnit.SECONDS); |
| |
| // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { |
| // @Override |
| // public void run() { |
| // DefaultMessageStore.this.cleanExpiredConsumerQueue(); |
| // } |
| // }, 1, 1, TimeUnit.HOURS); |
| } |
| |
| private void cleanFilesPeriodically() { |
| this.cleanCommitLogService.run(); |
| this.cleanConsumeQueueService.run(); |
| this.correctLogicOffsetService.run(); |
| } |
| |
| private void checkSelf() { |
| this.commitLog.checkSelf(); |
| this.consumeQueueStore.checkSelf(); |
| } |
| |
| private boolean isTempFileExist() { |
| String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()); |
| File file = new File(fileName); |
| return file.exists(); |
| } |
| |
| private void recover(final boolean lastExitOK) { |
| long recoverCqStart = System.currentTimeMillis(); |
| long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); |
| long recoverCqEnd = System.currentTimeMillis(); |
| |
| if (lastExitOK) { |
| this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); |
| } else { |
| this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); |
| } |
| long recoverClogEnd = System.currentTimeMillis(); |
| this.recoverTopicQueueTable(); |
| long recoverOffsetEnd = System.currentTimeMillis(); |
| |
| LOGGER.info("Recover end total:{} recoverCq:{} recoverClog:{} recoverOffset:{}", |
| recoverOffsetEnd - recoverCqStart, recoverCqEnd - recoverCqStart, recoverClogEnd - recoverCqEnd, recoverOffsetEnd - recoverClogEnd); |
| } |
| |
| @Override |
| public MessageStoreConfig getMessageStoreConfig() { |
| return messageStoreConfig; |
| } |
| |
| @Override |
| public TransientStorePool getTransientStorePool() { |
| return transientStorePool; |
| } |
| |
| private long recoverConsumeQueue() { |
| return this.consumeQueueStore.recover(); |
| } |
| |
| public void recoverTopicQueueTable() { |
| long minPhyOffset = this.commitLog.getMinOffset(); |
| this.consumeQueueStore.recoverOffsetTable(minPhyOffset); |
| } |
| |
| @Override |
| public AllocateMappedFileService getAllocateMappedFileService() { |
| return allocateMappedFileService; |
| } |
| |
| @Override |
| public StoreStatsService getStoreStatsService() { |
| return storeStatsService; |
| } |
| |
| public RunningFlags getAccessRights() { |
| return runningFlags; |
| } |
| |
| public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> getConsumeQueueTable() { |
| return consumeQueueStore.getConsumeQueueTable(); |
| } |
| |
| @Override |
| public StoreCheckpoint getStoreCheckpoint() { |
| return storeCheckpoint; |
| } |
| |
| @Override |
| public HAService getHaService() { |
| return haService; |
| } |
| |
| @Override |
| public RunningFlags getRunningFlags() { |
| return runningFlags; |
| } |
| |
| public void doDispatch(DispatchRequest req) { |
| for (CommitLogDispatcher dispatcher : this.dispatcherList) { |
| dispatcher.dispatch(req); |
| } |
| } |
| |
| public void putMessagePositionInfo(DispatchRequest dispatchRequest) { |
| this.consumeQueueStore.putMessagePositionInfoWrapper(dispatchRequest); |
| } |
| |
| @Override |
| public DispatchRequest checkMessageAndReturnSize(final ByteBuffer byteBuffer, final boolean checkCRC, |
| final boolean checkDupInfo, final boolean readBody) { |
| return this.commitLog.checkMessageAndReturnSize(byteBuffer, checkCRC, checkDupInfo, readBody); |
| } |
| |
| @Override |
| public long getStateMachineVersion() { |
| return 0L; |
| } |
| |
| public BrokerStatsManager getBrokerStatsManager() { |
| return brokerStatsManager; |
| } |
| |
| public BrokerConfig getBrokerConfig() { |
| return brokerConfig; |
| } |
| |
| public int remainTransientStoreBufferNumbs() { |
| return this.transientStorePool.availableBufferNums(); |
| } |
| |
| @Override |
| public boolean isTransientStorePoolDeficient() { |
| return remainTransientStoreBufferNumbs() == 0; |
| } |
| |
| @Override |
| public long remainHowManyDataToCommit() { |
| return this.commitLog.remainHowManyDataToCommit(); |
| } |
| |
| @Override |
| public long remainHowManyDataToFlush() { |
| return this.commitLog.remainHowManyDataToFlush(); |
| } |
| |
| @Override |
| public LinkedList<CommitLogDispatcher> getDispatcherList() { |
| return this.dispatcherList; |
| } |
| |
| @Override |
| public void setMasterStoreInProcess(MessageStore masterStoreInProcess) { |
| this.masterStoreInProcess = masterStoreInProcess; |
| } |
| |
| @Override |
| public MessageStore getMasterStoreInProcess() { |
| return this.masterStoreInProcess; |
| } |
| |
| @Override |
| public boolean getData(long offset, int size, ByteBuffer byteBuffer) { |
| return this.commitLog.getData(offset, size, byteBuffer); |
| } |
| |
| @Override |
| public ConsumeQueueInterface getConsumeQueue(String topic, int queueId) { |
| ConcurrentMap<Integer, ConsumeQueueInterface> map = this.getConsumeQueueTable().get(topic); |
| if (map == null) { |
| return null; |
| } |
| return map.get(queueId); |
| } |
| |
| @Override |
| public void unlockMappedFile(final MappedFile mappedFile) { |
| this.scheduledExecutorService.schedule(new Runnable() { |
| @Override |
| public void run() { |
| mappedFile.munlock(); |
| } |
| }, 6, TimeUnit.SECONDS); |
| } |
| |
| @Override |
| public PerfCounter.Ticks getPerfCounter() { |
| return perfs; |
| } |
| |
| @Override |
| public ConsumeQueueStore getQueueStore() { |
| return consumeQueueStore; |
| } |
| |
| @Override |
| public void onCommitLogAppend(MessageExtBrokerInner msg, AppendMessageResult result, MappedFile commitLogFile) { |
| // empty |
| } |
| |
| @Override |
| public void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, MappedFile commitLogFile, |
| boolean isRecover, boolean isFileEnd) { |
| if (doDispatch && !isFileEnd) { |
| this.doDispatch(dispatchRequest); |
| } |
| } |
| |
| @Override |
| public boolean isSyncDiskFlush() { |
| return FlushDiskType.SYNC_FLUSH == this.getMessageStoreConfig().getFlushDiskType(); |
| } |
| |
| @Override |
| public boolean isSyncMaster() { |
| return BrokerRole.SYNC_MASTER == this.getMessageStoreConfig().getBrokerRole(); |
| } |
| |
| @Override |
| public void assignOffset(MessageExtBrokerInner msg, short messageNum) { |
| final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); |
| |
| if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { |
| this.consumeQueueStore.assignQueueOffset(msg, messageNum); |
| } |
| } |
| |
| @Override |
| public Optional<TopicConfig> getTopicConfig(String topic) { |
| return this.consumeQueueStore.getTopicConfig(topic); |
| } |
| |
| public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) { |
| this.consumeQueueStore.setTopicConfigTable(topicConfigTable); |
| } |
| |
| public BrokerIdentity getBrokerIdentity() { |
| if (messageStoreConfig.isEnableDLegerCommitLog()) { |
| return new BrokerIdentity( |
| brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), |
| Integer.parseInt(messageStoreConfig.getdLegerSelfId().substring(1)), brokerConfig.isInBrokerContainer()); |
| } else { |
| return new BrokerIdentity( |
| brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), |
| brokerConfig.getBrokerId(), brokerConfig.isInBrokerContainer()); |
| } |
| } |
| |
| class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { |
| |
| @Override |
| public void dispatch(DispatchRequest request) { |
| final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); |
| switch (tranType) { |
| case MessageSysFlag.TRANSACTION_NOT_TYPE: |
| case MessageSysFlag.TRANSACTION_COMMIT_TYPE: |
| DefaultMessageStore.this.putMessagePositionInfo(request); |
| break; |
| case MessageSysFlag.TRANSACTION_PREPARED_TYPE: |
| case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: |
| break; |
| } |
| } |
| } |
| |
| class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { |
| |
| @Override |
| public void dispatch(DispatchRequest request) { |
| if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) { |
| DefaultMessageStore.this.indexService.buildIndex(request); |
| } |
| } |
| } |
| |
| class CleanCommitLogService { |
| |
| private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20; |
| private final String diskSpaceWarningLevelRatio = |
| System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", ""); |
| |
| private final String diskSpaceCleanForciblyRatio = |
| System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", ""); |
| private long lastRedeleteTimestamp = 0; |
| |
| private volatile int manualDeleteFileSeveralTimes = 0; |
| |
| private volatile boolean cleanImmediately = false; |
| |
| private int forceCleanFailedTimes = 0; |
| |
| double getDiskSpaceWarningLevelRatio() { |
| double finalDiskSpaceWarningLevelRatio; |
| if ("".equals(diskSpaceWarningLevelRatio)) { |
| finalDiskSpaceWarningLevelRatio = DefaultMessageStore.this.getMessageStoreConfig().getDiskSpaceWarningLevelRatio() / 100.0; |
| } else { |
| finalDiskSpaceWarningLevelRatio = Double.parseDouble(diskSpaceWarningLevelRatio); |
| } |
| |
| if (finalDiskSpaceWarningLevelRatio > 0.90) { |
| finalDiskSpaceWarningLevelRatio = 0.90; |
| } |
| if (finalDiskSpaceWarningLevelRatio < 0.35) { |
| finalDiskSpaceWarningLevelRatio = 0.35; |
| } |
| |
| return finalDiskSpaceWarningLevelRatio; |
| } |
| |
| double getDiskSpaceCleanForciblyRatio() { |
| double finalDiskSpaceCleanForciblyRatio; |
| if ("".equals(diskSpaceCleanForciblyRatio)) { |
| finalDiskSpaceCleanForciblyRatio = DefaultMessageStore.this.getMessageStoreConfig().getDiskSpaceCleanForciblyRatio() / 100.0; |
| } else { |
| finalDiskSpaceCleanForciblyRatio = Double.parseDouble(diskSpaceCleanForciblyRatio); |
| } |
| |
| if (finalDiskSpaceCleanForciblyRatio > 0.85) { |
| finalDiskSpaceCleanForciblyRatio = 0.85; |
| } |
| if (finalDiskSpaceCleanForciblyRatio < 0.30) { |
| finalDiskSpaceCleanForciblyRatio = 0.30; |
| } |
| |
| return finalDiskSpaceCleanForciblyRatio; |
| } |
| |
| public void executeDeleteFilesManually() { |
| this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES; |
| DefaultMessageStore.LOGGER.info("executeDeleteFilesManually was invoked"); |
| } |
| |
| public void run() { |
| try { |
| this.deleteExpiredFiles(); |
| this.reDeleteHangedFile(); |
| } catch (Throwable e) { |
| DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e); |
| } |
| } |
| |
| private void deleteExpiredFiles() { |
| int deleteCount = 0; |
| long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); |
| int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); |
| int destroyMappedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); |
| int deleteFileBatchMax = DefaultMessageStore.this.getMessageStoreConfig().getDeleteFileBatchMax(); |
| |
| boolean isTimeUp = this.isTimeToDelete(); |
| boolean isUsageExceedsThreshold = this.isSpaceToDelete(); |
| boolean isManualDelete = this.manualDeleteFileSeveralTimes > 0; |
| |
| if (isTimeUp || isUsageExceedsThreshold || isManualDelete) { |
| |
| if (isManualDelete) { |
| this.manualDeleteFileSeveralTimes--; |
| } |
| |
| boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; |
| |
| LOGGER.info("begin to delete before {} hours file. isTimeUp: {} isUsageExceedsThreshold: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {} deleteFileBatchMax: {}", |
| fileReservedTime, |
| isTimeUp, |
| isUsageExceedsThreshold, |
| manualDeleteFileSeveralTimes, |
| cleanAtOnce, |
| deleteFileBatchMax); |
| |
| fileReservedTime *= 60 * 60 * 1000; |
| |
| deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, |
| destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax); |
| if (deleteCount > 0) { |
| } else if (isUsageExceedsThreshold) { |
| LOGGER.warn("disk space will be full soon, but delete file failed."); |
| } |
| } |
| } |
| |
| private void reDeleteHangedFile() { |
| int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval(); |
| long currentTimestamp = System.currentTimeMillis(); |
| if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) { |
| this.lastRedeleteTimestamp = currentTimestamp; |
| int destroyMappedFileIntervalForcibly = |
| DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); |
| if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMappedFileIntervalForcibly)) { |
| } |
| } |
| } |
| |
| public String getServiceName() { |
| return DefaultMessageStore.this.brokerConfig.getLoggerIdentifier() + CleanCommitLogService.class.getSimpleName(); |
| } |
| |
| private boolean isTimeToDelete() { |
| String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen(); |
| if (UtilAll.isItTimeToDo(when)) { |
| DefaultMessageStore.LOGGER.info("it's time to reclaim disk space, " + when); |
| return true; |
| } |
| |
| return false; |
| } |
| |
| private boolean isSpaceToDelete() { |
| cleanImmediately = false; |
| |
| String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); |
| String[] storePaths = commitLogStorePath.trim().split(MixAll.MULTI_PATH_SPLITTER); |
| Set<String> fullStorePath = new HashSet<>(); |
| double minPhysicRatio = 100; |
| String minStorePath = null; |
| for (String storePathPhysic : storePaths) { |
| double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); |
| if (minPhysicRatio > physicRatio) { |
| minPhysicRatio = physicRatio; |
| minStorePath = storePathPhysic; |
| } |
| if (physicRatio > getDiskSpaceCleanForciblyRatio()) { |
| fullStorePath.add(storePathPhysic); |
| } |
| } |
| DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); |
| if (minPhysicRatio > getDiskSpaceWarningLevelRatio()) { |
| boolean diskFull = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); |
| if (diskFull) { |
| DefaultMessageStore.LOGGER.error("physic disk maybe full soon " + minPhysicRatio + |
| ", so mark disk full, storePathPhysic=" + minStorePath); |
| } |
| |
| cleanImmediately = true; |
| return true; |
| } else if (minPhysicRatio > getDiskSpaceCleanForciblyRatio()) { |
| cleanImmediately = true; |
| return true; |
| } else { |
| boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); |
| if (!diskOK) { |
| DefaultMessageStore.LOGGER.info("physic disk space OK " + minPhysicRatio + |
| ", so mark disk ok, storePathPhysic=" + minStorePath); |
| } |
| } |
| |
| String storePathLogics = StorePathConfigHelper |
| .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()); |
| double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); |
| if (logicsRatio > getDiskSpaceWarningLevelRatio()) { |
| boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); |
| if (diskOK) { |
| DefaultMessageStore.LOGGER.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full"); |
| } |
| |
| cleanImmediately = true; |
| return true; |
| } else if (logicsRatio > getDiskSpaceCleanForciblyRatio()) { |
| cleanImmediately = true; |
| return true; |
| } else { |
| boolean diskOK = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); |
| if (!diskOK) { |
| DefaultMessageStore.LOGGER.info("logics disk space OK " + logicsRatio + ", so mark disk ok"); |
| } |
| } |
| |
| double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; |
| int replicasPerPartition = DefaultMessageStore.this.getMessageStoreConfig().getReplicasPerDiskPartition(); |
| // Only one commitLog in node |
| if (replicasPerPartition <= 1) { |
| if (minPhysicRatio < 0 || minPhysicRatio > ratio) { |
| DefaultMessageStore.LOGGER.info("commitLog disk maybe full soon, so reclaim space, " + minPhysicRatio); |
| return true; |
| } |
| |
| if (logicsRatio < 0 || logicsRatio > ratio) { |
| DefaultMessageStore.LOGGER.info("consumeQueue disk maybe full soon, so reclaim space, " + logicsRatio); |
| return true; |
| } |
| return false; |
| } else { |
| long majorFileSize = DefaultMessageStore.this.getMajorFileSize(); |
| long partitionLogicalSize = UtilAll.getDiskPartitionTotalSpace(minStorePath) / replicasPerPartition; |
| double logicalRatio = 1.0 * majorFileSize / partitionLogicalSize; |
| |
| if (logicalRatio > DefaultMessageStore.this.getMessageStoreConfig().getLogicalDiskSpaceCleanForciblyThreshold()) { |
| // if logical ratio exceeds 0.80, then clean immediately |
| DefaultMessageStore.LOGGER.info("Logical disk usage {} exceeds logical disk space clean forcibly threshold {}, forcibly: {}", |
| logicalRatio, minPhysicRatio, cleanImmediately); |
| cleanImmediately = true; |
| return true; |
| } |
| |
| boolean isUsageExceedsThreshold = logicalRatio > ratio; |
| if (isUsageExceedsThreshold) { |
| DefaultMessageStore.LOGGER.info("Logical disk usage {} exceeds clean threshold {}, forcibly: {}", |
| logicalRatio, ratio, cleanImmediately); |
| } |
| return isUsageExceedsThreshold; |
| } |
| } |
| |
| public int getManualDeleteFileSeveralTimes() { |
| return manualDeleteFileSeveralTimes; |
| } |
| |
| public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) { |
| this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes; |
| } |
| |
| public double calcStorePathPhysicRatio() { |
| Set<String> fullStorePath = new HashSet<>(); |
| String storePath = getStorePathPhysic(); |
| String[] paths = storePath.trim().split(MixAll.MULTI_PATH_SPLITTER); |
| double minPhysicRatio = 100; |
| for (String path : paths) { |
| double physicRatio = UtilAll.isPathExists(path) ? |
| UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1; |
| minPhysicRatio = Math.min(minPhysicRatio, physicRatio); |
| if (physicRatio > getDiskSpaceCleanForciblyRatio()) { |
| fullStorePath.add(path); |
| } |
| } |
| DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); |
| return minPhysicRatio; |
| |
| } |
| |
| public boolean isSpaceFull() { |
| double physicRatio = calcStorePathPhysicRatio(); |
| double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; |
| if (physicRatio > ratio) { |
| DefaultMessageStore.LOGGER.info("physic disk of commitLog used: " + physicRatio); |
| } |
| if (physicRatio > this.getDiskSpaceWarningLevelRatio()) { |
| boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); |
| if (diskok) { |
| DefaultMessageStore.LOGGER.error("physic disk of commitLog maybe full soon, used " + physicRatio + ", so mark disk full"); |
| } |
| |
| return true; |
| } else { |
| boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); |
| |
| if (!diskok) { |
| DefaultMessageStore.LOGGER.info("physic disk space of commitLog OK " + physicRatio + ", so mark disk ok"); |
| } |
| |
| return false; |
| } |
| } |
| } |
| |
| class CleanConsumeQueueService { |
| private long lastPhysicalMinOffset = 0; |
| |
| public void run() { |
| try { |
| this.deleteExpiredFiles(); |
| } catch (Throwable e) { |
| DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e); |
| } |
| } |
| |
| private void deleteExpiredFiles() { |
| int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval(); |
| |
| long minOffset = DefaultMessageStore.this.commitLog.getMinOffset(); |
| if (minOffset > this.lastPhysicalMinOffset) { |
| this.lastPhysicalMinOffset = minOffset; |
| |
| ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.getConsumeQueueTable(); |
| |
| for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) { |
| for (ConsumeQueueInterface logic : maps.values()) { |
| int deleteCount = DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minOffset); |
| if (deleteCount > 0 && deleteLogicsFilesInterval > 0) { |
| try { |
| Thread.sleep(deleteLogicsFilesInterval); |
| } catch (InterruptedException ignored) { |
| } |
| } |
| } |
| } |
| |
| DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset); |
| } |
| } |
| |
| public String getServiceName() { |
| return DefaultMessageStore.this.brokerConfig.getLoggerIdentifier() + CleanConsumeQueueService.class.getSimpleName(); |
| } |
| } |
| |
| class CorrectLogicOffsetService { |
| private long lastForceCorrectTime = -1L; |
| |
| public void run() { |
| try { |
| this.correctLogicMinOffset(); |
| } catch (Throwable e) { |
| LOGGER.warn(this.getServiceName() + " service has exception. ", e); |
| } |
| } |
| |
| private boolean needCorrect(ConsumeQueueInterface logic, long minPhyOffset, long lastForeCorrectTimeCurRun) { |
| if (logic == null) { |
| return false; |
| } |
| // If first exist and not available, it means first file may destroy failed, delete it. |
| if (DefaultMessageStore.this.consumeQueueStore.isFirstFileExist(logic) && !DefaultMessageStore.this.consumeQueueStore.isFirstFileAvailable(logic)) { |
| LOGGER.error("CorrectLogicOffsetService.needCorrect. first file not available, trigger correct." + |
| " topic:{}, queue:{}, maxPhyOffset in queue:{}, minPhyOffset " + |
| "in commit log:{}, minOffset in queue:{}, maxOffset in queue:{}, cqType:{}" |
| , logic.getTopic(), logic.getQueueId(), logic.getMaxPhysicOffset() |
| , minPhyOffset, logic.getMinOffsetInQueue(), logic.getMaxOffsetInQueue(), logic.getCQType()); |
| return true; |
| } |
| |
| // logic.getMaxPhysicOffset() or minPhyOffset = -1 |
| // means there is no message in current queue, so no need to correct. |
| if (logic.getMaxPhysicOffset() == -1 || minPhyOffset == -1) { |
| return false; |
| } |
| |
| if (logic.getMaxPhysicOffset() < minPhyOffset) { |
| if (logic.getMinOffsetInQueue() < logic.getMaxOffsetInQueue()) { |
| LOGGER.error("CorrectLogicOffsetService.needCorrect. logic max phy offset: {} is less than min phy offset: {}, " + |
| "but min offset: {} is less than max offset: {}. topic:{}, queue:{}, cqType:{}." |
| , logic.getMaxPhysicOffset(), minPhyOffset, logic.getMinOffsetInQueue() |
| , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType()); |
| return true; |
| } else if (logic.getMinOffsetInQueue() == logic.getMaxOffsetInQueue()) { |
| return false; |
| } else { |
| LOGGER.error("CorrectLogicOffsetService.needCorrect. It should not happen, logic max phy offset: {} is less than min phy offset: {}," + |
| " but min offset: {} is larger than max offset: {}. topic:{}, queue:{}, cqType:{}" |
| , logic.getMaxPhysicOffset(), minPhyOffset, logic.getMinOffsetInQueue() |
| , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType()); |
| return false; |
| } |
| } |
| //the logic.getMaxPhysicOffset() >= minPhyOffset |
| int forceCorrectInterval = DefaultMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetForceInterval(); |
| if ((System.currentTimeMillis() - lastForeCorrectTimeCurRun) > forceCorrectInterval) { |
| lastForceCorrectTime = System.currentTimeMillis(); |
| CqUnit cqUnit = logic.getEarliestUnit(); |
| if (cqUnit == null) { |
| if (logic.getMinOffsetInQueue() == logic.getMaxOffsetInQueue()) { |
| return false; |
| } else { |
| LOGGER.error("CorrectLogicOffsetService.needCorrect. cqUnit is null, logic max phy offset: {} is greater than min phy offset: {}, " + |
| "but min offset: {} is not equal to max offset: {}. topic:{}, queue:{}, cqType:{}." |
| , logic.getMaxPhysicOffset(), minPhyOffset, logic.getMinOffsetInQueue() |
| , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType()); |
| return true; |
| } |
| } |
| |
| if (cqUnit.getPos() < minPhyOffset) { |
| LOGGER.error("CorrectLogicOffsetService.needCorrect. logic max phy offset: {} is greater than min phy offset: {}, " + |
| "but minPhyPos in cq is: {}. min offset in queue: {}, max offset in queue: {}, topic:{}, queue:{}, cqType:{}." |
| , logic.getMaxPhysicOffset(), minPhyOffset, cqUnit.getPos(), logic.getMinOffsetInQueue() |
| , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType()); |
| return true; |
| } |
| |
| if (cqUnit.getPos() >= minPhyOffset) { |
| |
| // Normal case, do not need correct. |
| return false; |
| } |
| } |
| |
| return false; |
| } |
| |
| private void correctLogicMinOffset() { |
| |
| long lastForeCorrectTimeCurRun = lastForceCorrectTime; |
| long minPhyOffset = getMinPhyOffset(); |
| ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.getConsumeQueueTable(); |
| for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) { |
| for (ConsumeQueueInterface logic : maps.values()) { |
| if (Objects.equals(CQType.SimpleCQ, logic.getCQType())) { |
| // cq is not supported for now. |
| continue; |
| } |
| if (needCorrect(logic, minPhyOffset, lastForeCorrectTimeCurRun)) { |
| doCorrect(logic, minPhyOffset); |
| } |
| } |
| } |
| } |
| |
| private void doCorrect(ConsumeQueueInterface logic, long minPhyOffset) { |
| DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minPhyOffset); |
| int sleepIntervalWhenCorrectMinOffset = DefaultMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetSleepInterval(); |
| if (sleepIntervalWhenCorrectMinOffset > 0) { |
| try { |
| Thread.sleep(sleepIntervalWhenCorrectMinOffset); |
| } catch (InterruptedException ignored) { |
| } |
| } |
| } |
| |
| public String getServiceName() { |
| if (brokerConfig.isInBrokerContainer()) { |
| return brokerConfig.getLoggerIdentifier() + CorrectLogicOffsetService.class.getSimpleName(); |
| } |
| return CorrectLogicOffsetService.class.getSimpleName(); |
| } |
| } |
| |
| class FlushConsumeQueueService extends ServiceThread { |
| private static final int RETRY_TIMES_OVER = 3; |
| private long lastFlushTimestamp = 0; |
| |
| private void doFlush(int retryTimes) { |
| int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages(); |
| |
| if (retryTimes == RETRY_TIMES_OVER) { |
| flushConsumeQueueLeastPages = 0; |
| } |
| |
| long logicsMsgTimestamp = 0; |
| |
| int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval(); |
| long currentTimeMillis = System.currentTimeMillis(); |
| if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) { |
| this.lastFlushTimestamp = currentTimeMillis; |
| flushConsumeQueueLeastPages = 0; |
| logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp(); |
| } |
| |
| ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.getConsumeQueueTable(); |
| |
| for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) { |
| for (ConsumeQueueInterface cq : maps.values()) { |
| boolean result = false; |
| for (int i = 0; i < retryTimes && !result; i++) { |
| result = DefaultMessageStore.this.consumeQueueStore.flush(cq, flushConsumeQueueLeastPages); |
| } |
| } |
| } |
| |
| if (0 == flushConsumeQueueLeastPages) { |
| if (logicsMsgTimestamp > 0) { |
| DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); |
| } |
| DefaultMessageStore.this.getStoreCheckpoint().flush(); |
| } |
| } |
| |
| @Override |
| public void run() { |
| DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started"); |
| |
| while (!this.isStopped()) { |
| try { |
| int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue(); |
| this.waitForRunning(interval); |
| this.doFlush(1); |
| } catch (Exception e) { |
| DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e); |
| } |
| } |
| |
| this.doFlush(RETRY_TIMES_OVER); |
| |
| DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end"); |
| } |
| |
| @Override |
| public String getServiceName() { |
| if (DefaultMessageStore.this.brokerConfig.isInBrokerContainer()) { |
| return DefaultMessageStore.this.getBrokerIdentity().getLoggerIdentifier() + FlushConsumeQueueService.class.getSimpleName(); |
| } |
| return FlushConsumeQueueService.class.getSimpleName(); |
| } |
| |
| @Override |
| public long getJoinTime() { |
| return 1000 * 60; |
| } |
| } |
| |
| class ReputMessageService extends ServiceThread { |
| |
| private volatile long reputFromOffset = 0; |
| |
| public long getReputFromOffset() { |
| return reputFromOffset; |
| } |
| |
| public void setReputFromOffset(long reputFromOffset) { |
| this.reputFromOffset = reputFromOffset; |
| } |
| |
| @Override |
| public void shutdown() { |
| for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) { |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ignored) { |
| } |
| } |
| |
| if (this.isCommitLogAvailable()) { |
| LOGGER.warn("shutdown ReputMessageService, but CommitLog have not finish to be dispatched, CommitLog max" + |
| " offset={}, reputFromOffset={}", DefaultMessageStore.this.commitLog.getMaxOffset(), |
| this.reputFromOffset); |
| } |
| |
| super.shutdown(); |
| } |
| |
| public long behind() { |
| return DefaultMessageStore.this.commitLog.getConfirmOffset() - this.reputFromOffset; |
| } |
| |
| private boolean isCommitLogAvailable() { |
| if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()) { |
| return this.reputFromOffset <= DefaultMessageStore.this.commitLog.getConfirmOffset(); |
| } |
| return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset(); |
| } |
| |
| private void doReput() { |
| if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { |
| LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", |
| this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); |
| this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); |
| } |
| for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { |
| |
| if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() |
| && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { |
| break; |
| } |
| |
| SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); |
| if (result != null) { |
| try { |
| this.reputFromOffset = result.getStartOffset(); |
| |
| for (int readSize = 0; readSize < result.getSize() && reputFromOffset <= DefaultMessageStore.this.getConfirmOffset() && doNext; ) { |
| DispatchRequest dispatchRequest = |
| DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false, false); |
| int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); |
| |
| if (dispatchRequest.isSuccess()) { |
| if (size > 0) { |
| DefaultMessageStore.this.doDispatch(dispatchRequest); |
| |
| if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable() |
| && DefaultMessageStore.this.messageArrivingListener != null) { |
| DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), |
| dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, |
| dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), |
| dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); |
| notifyMessageArrive4MultiQueue(dispatchRequest); |
| } |
| |
| this.reputFromOffset += size; |
| readSize += size; |
| if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && |
| DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { |
| DefaultMessageStore.this.storeStatsService |
| .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(dispatchRequest.getBatchSize()); |
| DefaultMessageStore.this.storeStatsService |
| .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) |
| .add(dispatchRequest.getMsgSize()); |
| } |
| } else if (size == 0) { |
| this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); |
| readSize = result.getSize(); |
| } |
| } else if (!dispatchRequest.isSuccess()) { |
| |
| if (size > 0) { |
| LOGGER.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); |
| this.reputFromOffset += size; |
| } else { |
| doNext = false; |
| // If user open the dledger pattern or the broker is master node, |
| // it will not ignore the exception and fix the reputFromOffset variable |
| if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() || |
| DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { |
| LOGGER.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}", |
| this.reputFromOffset); |
| this.reputFromOffset += result.getSize() - readSize; |
| } |
| } |
| } |
| } |
| } finally { |
| result.release(); |
| } |
| } else { |
| doNext = false; |
| } |
| } |
| } |
| |
| private void notifyMessageArrive4MultiQueue(DispatchRequest dispatchRequest) { |
| Map<String, String> prop = dispatchRequest.getPropertiesMap(); |
| if (prop == null) { |
| return; |
| } |
| String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); |
| String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET); |
| if (StringUtils.isBlank(multiDispatchQueue) || StringUtils.isBlank(multiQueueOffset)) { |
| return; |
| } |
| String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); |
| String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); |
| if (queues.length != queueOffsets.length) { |
| return; |
| } |
| for (int i = 0; i < queues.length; i++) { |
| String queueName = queues[i]; |
| long queueOffset = Long.parseLong(queueOffsets[i]); |
| int queueId = dispatchRequest.getQueueId(); |
| if (DefaultMessageStore.this.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) { |
| queueId = 0; |
| } |
| DefaultMessageStore.this.messageArrivingListener.arriving( |
| queueName, queueId, queueOffset + 1, dispatchRequest.getTagsCode(), |
| dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); |
| } |
| } |
| |
| @Override |
| public void run() { |
| DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started"); |
| |
| while (!this.isStopped()) { |
| try { |
| Thread.sleep(1); |
| this.doReput(); |
| } catch (Exception e) { |
| DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e); |
| } |
| } |
| |
| DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end"); |
| } |
| |
| @Override |
| public String getServiceName() { |
| if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) { |
| return DefaultMessageStore.this.getBrokerIdentity().getLoggerIdentifier() + ReputMessageService.class.getSimpleName(); |
| } |
| return ReputMessageService.class.getSimpleName(); |
| } |
| |
| } |
| |
| @Override |
| public HARuntimeInfo getHARuntimeInfo() { |
| if (haService != null) { |
| return this.haService.getRuntimeInfo(this.commitLog.getMaxOffset()); |
| } else { |
| return null; |
| } |
| } |
| |
| public int getMaxDelayLevel() { |
| return maxDelayLevel; |
| } |
| |
| public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) { |
| Long time = this.delayLevelTable.get(delayLevel); |
| if (time != null) { |
| return time + storeTimestamp; |
| } |
| |
| return storeTimestamp + 1000; |
| } |
| |
| public List<PutMessageHook> getPutMessageHookList() { |
| return putMessageHookList; |
| } |
| |
| @Override public void setSendMessageBackHook(SendMessageBackHook sendMessageBackHook) { |
| this.sendMessageBackHook = sendMessageBackHook; |
| } |
| |
| @Override public SendMessageBackHook getSendMessageBackHook() { |
| return sendMessageBackHook; |
| } |
| |
| @Override public boolean isShutdown() { |
| return shutdown; |
| } |
| } |