blob: 79b745d89cb8df7259a08ecca602c5a0782b7c22 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.queue;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.logfile.MappedFile;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
public class SparseConsumeQueue extends BatchConsumeQueue {
public SparseConsumeQueue(
final String topic,
final int queueId,
final String storePath,
final int mappedFileSize,
final MessageStore defaultMessageStore) {
super(topic, queueId, storePath, mappedFileSize, defaultMessageStore);
}
public SparseConsumeQueue(
final String topic,
final int queueId,
final String storePath,
final int mappedFileSize,
final MessageStore defaultMessageStore,
final String subfolder) {
super(topic, queueId, storePath, mappedFileSize, defaultMessageStore, subfolder);
}
@Override
public void recover() {
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
int index = mappedFiles.size() - 3;
if (index < 0) {
index = 0;
}
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
int mappedFileOffset = 0;
long processOffset = mappedFile.getFileFromOffset();
while (true) {
for (int i = 0; i < mappedFileSize; i += CQ_STORE_UNIT_SIZE) {
byteBuffer.position(i);
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
byteBuffer.getLong(); //tagscode
byteBuffer.getLong(); //timestamp
long msgBaseOffset = byteBuffer.getLong();
short batchSize = byteBuffer.getShort();
if (offset >= 0 && size > 0 && msgBaseOffset >= 0 && batchSize > 0) {
mappedFileOffset += CQ_STORE_UNIT_SIZE;
this.maxMsgPhyOffsetInCommitLog = offset;
} else {
log.info("Recover current batch consume queue file over, " + "file:{} offset:{} size:{} msgBaseOffset:{} batchSize:{} mappedFileOffset:{}",
mappedFile.getFileName(), offset, size, msgBaseOffset, batchSize, mappedFileOffset);
if (mappedFileOffset != mappedFileSize) {
mappedFile.setWrotePosition(mappedFileOffset);
mappedFile.setFlushedPosition(mappedFileOffset);
mappedFile.setCommittedPosition(mappedFileOffset);
}
break;
}
}
index++;
if (index >= mappedFiles.size()) {
log.info("Recover last batch consume queue file over, last mapped file:{} ", mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("Recover next batch consume queue file: " + mappedFile.getFileName());
}
}
processOffset += mappedFileOffset;
mappedFileQueue.setFlushedWhere(processOffset);
mappedFileQueue.setCommittedWhere(processOffset);
mappedFileQueue.truncateDirtyFiles(processOffset);
reviseMaxAndMinOffsetInQueue();
}
}
public ReferredIterator<CqUnit> iterateFromOrNext(long startOffset) {
SelectMappedBufferResult sbr = getBatchMsgIndexOrNextBuffer(startOffset);
if (sbr == null) {
return null;
}
return new BatchConsumeQueueIterator(sbr);
}
/**
* Gets SelectMappedBufferResult by batch-message offset, if not found will return the next valid offset buffer
* Node: the caller is responsible for the release of SelectMappedBufferResult
* @param msgOffset
* @return SelectMappedBufferResult
*/
public SelectMappedBufferResult getBatchMsgIndexOrNextBuffer(final long msgOffset) {
MappedFile targetBcq;
if (msgOffset <= minOffsetInQueue) {
targetBcq = mappedFileQueue.getFirstMappedFile();
} else {
targetBcq = searchFileByOffsetOrRight(msgOffset);
}
if (targetBcq == null) {
return null;
}
BatchOffsetIndex minOffset = getMinMsgOffset(targetBcq, false, false);
BatchOffsetIndex maxOffset = getMaxMsgOffset(targetBcq, false, false);
if (null == minOffset || null == maxOffset) {
return null;
}
SelectMappedBufferResult sbr = minOffset.getMappedFile().selectMappedBuffer(0);
try {
ByteBuffer byteBuffer = sbr.getByteBuffer();
int left = minOffset.getIndexPos();
int right = maxOffset.getIndexPos();
int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset);
if (mid != -1) {
return minOffset.getMappedFile().selectMappedBuffer(mid);
}
} finally {
sbr.release();
}
return null;
}
protected MappedFile searchOffsetFromCacheOrRight(long msgOffset) {
Map.Entry<Long, MappedFile> ceilingEntry = this.offsetCache.ceilingEntry(msgOffset);
if (ceilingEntry == null) {
return null;
} else {
return ceilingEntry.getValue();
}
}
protected MappedFile searchFileByOffsetOrRight(long msgOffset) {
MappedFile targetBcq = null;
boolean searchBcqByCacheEnable = this.messageStore.getMessageStoreConfig().isSearchBcqByCacheEnable();
if (searchBcqByCacheEnable) {
// it's not the last BCQ file, so search it through cache.
targetBcq = this.searchOffsetFromCacheOrRight(msgOffset);
// not found in cache
if (targetBcq == null) {
MappedFile firstBcq = mappedFileQueue.getFirstMappedFile();
BatchOffsetIndex minForFirstBcq = getMinMsgOffset(firstBcq, false, false);
if (minForFirstBcq != null && minForFirstBcq.getMsgOffset() <= msgOffset && msgOffset < maxOffsetInQueue) {
// old search logic
targetBcq = this.searchOffsetFromFilesOrRight(msgOffset);
}
log.warn("cache is not working on BCQ [Topic: {}, QueueId: {}] for msgOffset: {}, targetBcq: {}", this.topic, this.queueId, msgOffset, targetBcq);
}
} else {
// old search logic
targetBcq = this.searchOffsetFromFilesOrRight(msgOffset);
}
return targetBcq;
}
public MappedFile searchOffsetFromFilesOrRight(long msgOffset) {
MappedFile targetBcq = null;
// find the mapped file one by one reversely
int mappedFileNum = this.mappedFileQueue.getMappedFiles().size();
for (int i = mappedFileNum - 1; i >= 0; i--) {
MappedFile mappedFile = mappedFileQueue.getMappedFiles().get(i);
BatchOffsetIndex tmpMinMsgOffset = getMinMsgOffset(mappedFile, false, false);
BatchOffsetIndex tmpMaxMsgOffset = getMaxMsgOffset(mappedFile, false, false);
if (null != tmpMaxMsgOffset && tmpMaxMsgOffset.getMsgOffset() < msgOffset) {
if (i != mappedFileNum - 1) { //not the last mapped file max msg offset
targetBcq = mappedFileQueue.getMappedFiles().get(i + 1);
break;
}
}
if (null != tmpMinMsgOffset && tmpMinMsgOffset.getMsgOffset() <= msgOffset
&& null != tmpMaxMsgOffset && msgOffset <= tmpMaxMsgOffset.getMsgOffset()) {
targetBcq = mappedFile;
break;
}
}
return targetBcq;
}
private MappedFile getPreFile(MappedFile file) {
int index = mappedFileQueue.getMappedFiles().indexOf(file);
if (index < 1) {
// indicate that this is the first file or not found
return null;
} else {
return mappedFileQueue.getMappedFiles().get(index - 1);
}
}
private void cacheOffset(MappedFile file, Function<MappedFile, BatchOffsetIndex> offsetGetFunc) {
try {
BatchOffsetIndex offset = offsetGetFunc.apply(file);
if (offset != null) {
this.offsetCache.put(offset.getMsgOffset(), offset.getMappedFile());
this.timeCache.put(offset.getStoreTimestamp(), offset.getMappedFile());
}
} catch (Exception e) {
log.error("Failed caching offset and time on BCQ [Topic: {}, QueueId: {}, File: {}]",
this.topic, this.queueId, file);
}
}
@Override
protected void cacheBcq(MappedFile bcq) {
MappedFile file = getPreFile(bcq);
if (file != null) {
cacheOffset(file, m -> getMaxMsgOffset(m, false, true));
}
}
public void putEndPositionInfo(MappedFile mappedFile) {
// cache max offset
if (!mappedFile.isFull()) {
this.byteBufferItem.flip();
this.byteBufferItem.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferItem.putLong(-1);
this.byteBufferItem.putInt(0);
this.byteBufferItem.putLong(0);
this.byteBufferItem.putLong(0);
this.byteBufferItem.putLong(0);
this.byteBufferItem.putShort((short)0);
this.byteBufferItem.putInt(INVALID_POS);
this.byteBufferItem.putInt(0); // 4 bytes reserved
boolean appendRes = mappedFile.appendMessage(this.byteBufferItem.array());
if (!appendRes) {
log.error("append end position info into {} failed", mappedFile.getFileName());
}
}
cacheOffset(mappedFile, m -> getMaxMsgOffset(m, false, true));
}
public MappedFile createFile(final long physicalOffset) throws IOException {
// cache max offset
return mappedFileQueue.tryCreateMappedFile(physicalOffset);
}
public boolean isLastFileFull() {
if (mappedFileQueue.getLastMappedFile() != null) {
return mappedFileQueue.getLastMappedFile().isFull();
} else {
return true;
}
}
public boolean shouldRoll() {
if (mappedFileQueue.getLastMappedFile() == null) {
return true;
}
if (mappedFileQueue.getLastMappedFile().isFull()) {
return true;
}
if (mappedFileQueue.getLastMappedFile().getWrotePosition() + BatchConsumeQueue.CQ_STORE_UNIT_SIZE
> mappedFileQueue.getMappedFileSize()) {
return true;
}
return false;
}
public boolean containsOffsetFile(final long physicalOffset) {
String fileName = UtilAll.offset2FileName(physicalOffset);
return mappedFileQueue.getMappedFiles().stream()
.anyMatch(mf -> Objects.equals(mf.getFile().getName(), fileName));
}
public long getMaxPhyOffsetInLog() {
MappedFile lastMappedFile = mappedFileQueue.getLastMappedFile();
Long maxOffsetInLog = getMax(lastMappedFile, b -> b.getLong(0) + b.getInt(8));
if (maxOffsetInLog != null) {
return maxOffsetInLog;
} else {
return -1;
}
}
private <T> T getMax(MappedFile mappedFile, Function<ByteBuffer, T> function) {
if (mappedFile == null || mappedFile.getReadPosition() < CQ_STORE_UNIT_SIZE) {
return null;
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
for (int i = mappedFile.getReadPosition() - CQ_STORE_UNIT_SIZE; i >= 0; i -= CQ_STORE_UNIT_SIZE) {
byteBuffer.position(i);
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong(); //tagscode
long timestamp = byteBuffer.getLong(); //timestamp
long msgBaseOffset = byteBuffer.getLong();
short batchSize = byteBuffer.getShort();
if (offset >= 0 && size > 0 && msgBaseOffset >= 0 && batchSize > 0) {
byteBuffer.position(i); //reset position
return function.apply(byteBuffer);
}
}
return null;
}
@Override
protected BatchOffsetIndex getMaxMsgOffset(MappedFile mappedFile, boolean getBatchSize, boolean getStoreTime) {
if (mappedFile == null || mappedFile.getReadPosition() < CQ_STORE_UNIT_SIZE) {
return null;
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
for (int i = mappedFile.getReadPosition() - CQ_STORE_UNIT_SIZE; i >= 0; i -= CQ_STORE_UNIT_SIZE) {
byteBuffer.position(i);
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
byteBuffer.getLong(); //tagscode
long timestamp = byteBuffer.getLong();//timestamp
long msgBaseOffset = byteBuffer.getLong();
short batchSize = byteBuffer.getShort();
if (offset >= 0 && size > 0 && msgBaseOffset >= 0 && batchSize > 0) {
// mappedFile.setWrotePosition(i + CQ_STORE_UNIT_SIZE);
// mappedFile.setFlushedPosition(i + CQ_STORE_UNIT_SIZE);
// mappedFile.setCommittedPosition(i + CQ_STORE_UNIT_SIZE);
return new BatchOffsetIndex(mappedFile, i, msgBaseOffset, batchSize, timestamp);
}
}
return null;
}
public long getMaxMsgOffsetFromFile(String simpleFileName) {
MappedFile mappedFile = mappedFileQueue.getMappedFiles().stream()
.filter(m -> Objects.equals(m.getFile().getName(), simpleFileName))
.findFirst()
.orElse(null);
if (mappedFile == null) {
return -1;
}
BatchOffsetIndex max = getMaxMsgOffset(mappedFile, false, false);
if (max == null) {
return -1;
}
return max.getMsgOffset();
}
private void refreshMaxCache() {
doRefreshCache(m -> getMaxMsgOffset(m, false, true));
}
@Override
protected void refreshCache() {
refreshMaxCache();
}
public void refresh() {
reviseMaxAndMinOffsetInQueue();
refreshCache();
}
}