blob: e5fdae71446120c5ec82cafab51d4ca803a68394 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.kv;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.CompactionAppendMsgCallback;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MappedFileQueue;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageLock;
import org.apache.rocketmq.store.PutMessageReentrantLock;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageSpinLock;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.StoreUtil;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.queue.BatchConsumeQueue;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.queue.SparseConsumeQueue;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.security.DigestException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.apache.rocketmq.store.CommitLog.BLANK_MAGIC_CODE;
public class CompactionLog {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
private static final int MAX_PULL_MSG_SIZE = 128 * 1024 * 1024;
public static final String COMPACTING_SUB_FOLDER = "compacting";
public static final String REPLICATING_SUB_FOLDER = "replicating";
private final int compactionLogMappedFileSize;
private final int compactionCqMappedFileSize;
private final String compactionLogFilePath;
private final String compactionCqFilePath;
private final MessageStore defaultMessageStore;
private final CompactionStore compactionStore;
private final MessageStoreConfig messageStoreConfig;
private final CompactionAppendMsgCallback endMsgCallback;
private final String topic;
private final int queueId;
private final int offsetMapMemorySize;
private final PutMessageLock putMessageLock;
private final PutMessageLock readMessageLock;
private TopicPartitionLog current;
private TopicPartitionLog compacting;
private TopicPartitionLog replicating;
private CompactionPositionMgr positionMgr;
private AtomicReference<State> state;
public CompactionLog(final MessageStore messageStore, final CompactionStore compactionStore, final String topic, final int queueId)
throws IOException {
this.topic = topic;
this.queueId = queueId;
this.defaultMessageStore = messageStore;
this.compactionStore = compactionStore;
this.messageStoreConfig = messageStore.getMessageStoreConfig();
this.offsetMapMemorySize = compactionStore.getOffsetMapSize();
this.compactionCqMappedFileSize =
messageStoreConfig.getCompactionCqMappedFileSize() / BatchConsumeQueue.CQ_STORE_UNIT_SIZE
* BatchConsumeQueue.CQ_STORE_UNIT_SIZE;
this.compactionLogMappedFileSize = getCompactionLogSize(compactionCqMappedFileSize,
messageStoreConfig.getCompactionMappedFileSize());
this.compactionLogFilePath = Paths.get(compactionStore.getCompactionLogPath(),
topic, String.valueOf(queueId)).toString();
this.compactionCqFilePath = compactionStore.getCompactionCqPath(); // batch consume queue already separated
this.positionMgr = compactionStore.getPositionMgr();
this.putMessageLock =
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() :
new PutMessageSpinLock();
this.readMessageLock =
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() :
new PutMessageSpinLock();
this.endMsgCallback = new CompactionAppendEndMsgCallback();
this.state = new AtomicReference<>(State.INITIALIZING);
// this.load();
log.info("CompactionLog {}:{} init completed.", topic, queueId);
}
private int getCompactionLogSize(int cqSize, int origLogSize) {
int n = origLogSize / cqSize;
if (n < 5) {
return cqSize * 5;
}
int m = origLogSize % cqSize;
if (m > 0 && m < (cqSize >> 1)) {
return n * cqSize;
} else {
return (n + 1) * cqSize;
}
}
public void load(boolean exitOk) throws IOException, RuntimeException {
initLogAndCq(exitOk);
if (defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE
&& getLog().isMappedFilesEmpty()) {
log.info("{}:{} load compactionLog from remote master", topic, queueId);
loadFromRemoteAsync();
} else {
state.compareAndSet(State.INITIALIZING, State.NORMAL);
}
}
private void initLogAndCq(boolean exitOk) throws IOException, RuntimeException {
current = new TopicPartitionLog(this);
current.init(exitOk);
}
private boolean putMessageFromRemote(byte[] bytes) {
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
// split bytebuffer to avoid encode message again
while (byteBuffer.hasRemaining()) {
int mark = byteBuffer.position();
ByteBuffer bb = byteBuffer.slice();
int size = bb.getInt();
if (size < 0 || size > byteBuffer.capacity()) {
break;
} else {
bb.limit(size);
bb.rewind();
}
MessageExt messageExt = MessageDecoder.decode(bb, false, false);
long messageOffset = messageExt.getQueueOffset();
long minOffsetInQueue = getCQ().getMinOffsetInQueue();
if (getLog().isMappedFilesEmpty() || messageOffset < minOffsetInQueue) {
asyncPutMessage(bb, messageExt, replicating);
} else {
log.info("{}:{} message offset {} >= minOffsetInQueue {}, stop pull...",
topic, queueId, messageOffset, minOffsetInQueue);
return false;
}
byteBuffer.position(mark + size);
}
return true;
}
private void pullMessageFromMaster() throws Exception {
if (StringUtils.isBlank(compactionStore.getMasterAddr())) {
compactionStore.getCompactionSchedule().schedule(() -> {
try {
pullMessageFromMaster();
} catch (Exception e) {
log.error("pullMessageFromMaster exception: ", e);
}
}, 5, TimeUnit.SECONDS);
return;
}
replicating = new TopicPartitionLog(this, REPLICATING_SUB_FOLDER);
try (MessageFetcher messageFetcher = new MessageFetcher()) {
messageFetcher.pullMessageFromMaster(topic, queueId, getCQ().getMinOffsetInQueue(),
compactionStore.getMasterAddr(), (currOffset, response) -> {
if (currOffset < 0) {
log.info("{}:{} current offset {}, stop pull...", topic, queueId, currOffset);
return false;
}
return putMessageFromRemote(response.getBody());
// positionMgr.setOffset(topic, queueId, currOffset);
});
}
// merge files
if (getLog().isMappedFilesEmpty()) {
replaceFiles(getLog().getMappedFiles(), current, replicating);
} else if (replicating.getLog().isMappedFilesEmpty()) {
log.info("replicating message is empty"); //break
} else {
List<MappedFile> newFiles = Lists.newArrayList();
List<MappedFile> toCompactFiles = Lists.newArrayList(replicating.getLog().getMappedFiles());
putMessageLock.lock();
try {
// combine current and replicating to mappedFileList
newFiles = Lists.newArrayList(getLog().getMappedFiles());
toCompactFiles.addAll(newFiles); //all from current
current.roll(toCompactFiles.size() * compactionLogMappedFileSize);
} catch (Throwable e) {
log.error("roll log and cq exception: ", e);
} finally {
putMessageLock.unlock();
}
try {
// doCompaction with current and replicating
compactAndReplace(new FileList(toCompactFiles, newFiles));
} catch (Throwable e) {
log.error("do merge replicating and current exception: ", e);
}
}
// cleanReplicatingResource, force clean cq
replicating.clean(false, true);
// positionMgr.setOffset(topic, queueId, currentPullOffset);
state.compareAndSet(State.INITIALIZING, State.NORMAL);
}
private void loadFromRemoteAsync() {
compactionStore.getCompactionSchedule().submit(() -> {
try {
pullMessageFromMaster();
} catch (Exception e) {
log.error("fetch message from master exception: ", e);
}
});
// update (currentStatus) = LOADING
// request => get (start, end)
// pull message => current message offset > end
// done
// positionMgr.persist();
// update (currentStatus) = RUNNING
}
private long nextOffsetCorrection(long oldOffset, long newOffset) {
long nextOffset = oldOffset;
if (messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE || messageStoreConfig.isOffsetCheckInSlave()) {
nextOffset = newOffset;
}
return nextOffset;
}
private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE *
(this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
return (maxOffsetPy - offsetPy) > memory;
}
private boolean isTheBatchFull(int sizePy, int unitBatchNum, int maxMsgNums, long maxMsgSize,
int bufferTotal, int messageTotal, boolean isInDisk) {
if (0 == bufferTotal || 0 == messageTotal) {
return false;
}
if (messageTotal + unitBatchNum > maxMsgNums) {
return true;
}
if (bufferTotal + sizePy > maxMsgSize) {
return true;
}
if (isInDisk) {
if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
return true;
}
if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) {
return true;
}
} else {
if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
return true;
}
if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) {
return true;
}
}
return false;
}
public long rollNextFile(final long offset) {
return offset + compactionLogMappedFileSize - offset % compactionLogMappedFileSize;
}
boolean shouldRetainMsg(final MessageExt msgExt, final OffsetMap map) throws DigestException {
if (msgExt.getQueueOffset() > map.getLastOffset()) {
return true;
}
String key = msgExt.getKeys();
if (StringUtils.isNotBlank(key)) {
boolean keyNotExistOrOffsetBigger = msgExt.getQueueOffset() >= map.get(key);
boolean hasBody = ArrayUtils.isNotEmpty(msgExt.getBody());
return keyNotExistOrOffsetBigger && hasBody;
} else {
log.error("message has no keys");
return false;
}
}
public void checkAndPutMessage(final SelectMappedBufferResult selectMappedBufferResult, final MessageExt msgExt,
final OffsetMap offsetMap, final TopicPartitionLog tpLog)
throws DigestException {
if (shouldRetainMsg(msgExt, offsetMap)) {
asyncPutMessage(selectMappedBufferResult.getByteBuffer(), msgExt, tpLog);
}
}
public CompletableFuture<PutMessageResult> asyncPutMessage(final SelectMappedBufferResult selectMappedBufferResult) {
return asyncPutMessage(selectMappedBufferResult, current);
}
public CompletableFuture<PutMessageResult> asyncPutMessage(final SelectMappedBufferResult selectMappedBufferResult,
final TopicPartitionLog tpLog) {
MessageExt msgExt = MessageDecoder.decode(selectMappedBufferResult.getByteBuffer(), false, false);
return asyncPutMessage(selectMappedBufferResult.getByteBuffer(), msgExt, tpLog);
}
public CompletableFuture<PutMessageResult> asyncPutMessage(final ByteBuffer msgBuffer,
final MessageExt msgExt, final TopicPartitionLog tpLog) {
// fix duplicate
if (tpLog.getCQ().getMaxOffsetInQueue() - 1 >= msgExt.getQueueOffset()) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
if (StringUtils.isBlank(msgExt.getKeys())) {
log.warn("message {}-{}:{} have no key, will not put in compaction log",
msgExt.getTopic(), msgExt.getQueueId(), msgExt.getMsgId());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
putMessageLock.lock();
try {
long beginTime = System.nanoTime();
if (tpLog.isEmptyOrCurrentFileFull()) {
try {
tpLog.roll();
} catch (IOException e) {
log.error("create mapped file or consumerQueue exception: ", e);
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
}
}
MappedFile mappedFile = tpLog.getLog().getLastMappedFile();
CompactionAppendMsgCallback callback = new CompactionAppendMessageCallback(msgExt, tpLog.getCQ());
AppendMessageResult result = mappedFile.appendMessage(msgBuffer, callback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
try {
tpLog.roll();
} catch (IOException e) {
log.error("create mapped file2 error, topic: {}, clientAddr: {}", msgExt.getTopic(), msgExt.getBornHostString());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
}
mappedFile = tpLog.getLog().getLastMappedFile();
result = mappedFile.appendMessage(msgBuffer, callback);
break;
default:
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, result));
} finally {
putMessageLock.unlock();
}
}
private SelectMappedBufferResult getMessage(final long offset, final int size) {
MappedFile mappedFile = this.getLog().findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % compactionLogMappedFileSize);
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
}
private boolean validateCqUnit(CqUnit cqUnit) {
return cqUnit.getPos() >= 0
&& cqUnit.getSize() > 0
&& cqUnit.getQueueOffset() >= 0
&& cqUnit.getBatchNum() > 0;
}
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums, final int maxTotalMsgSize) {
readMessageLock.lock();
try {
long beginTime = System.nanoTime();
GetMessageStatus status;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
final long maxOffsetPy = getLog().getMaxOffset();
SparseConsumeQueue consumeQueue = getCQ();
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset == maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
} else {
long maxPullSize = Math.max(maxTotalMsgSize, 100);
if (maxPullSize > MAX_PULL_MSG_SIZE) {
log.warn("The max pull size is too large maxPullSize={} topic={} queueId={}",
maxPullSize, topic, queueId);
maxPullSize = MAX_PULL_MSG_SIZE;
}
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long maxPhyOffsetPulling = 0;
int cqFileNum = 0;
while (getResult.getBufferTotalSize() <= 0 && nextBeginOffset < maxOffset
&& cqFileNum++ < this.messageStoreConfig.getTravelCqFileNumWhenGetMessage()) {
ReferredIterator<CqUnit> bufferConsumeQueue = consumeQueue.iterateFromOrNext(nextBeginOffset);
if (bufferConsumeQueue == null) {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(nextBeginOffset, consumeQueue.rollNextFile(nextBeginOffset));
log.warn("consumer request topic:{}, offset:{}, minOffset:{}, maxOffset:{}, "
+ "but access logic queue failed. correct nextBeginOffset to {}",
topic, offset, minOffset, maxOffset, nextBeginOffset);
break;
}
try {
long nextPhyFileStartOffset = Long.MIN_VALUE;
while (bufferConsumeQueue.hasNext() && nextBeginOffset < maxOffset) {
CqUnit cqUnit = bufferConsumeQueue.next();
if (!validateCqUnit(cqUnit)) {
break;
}
long offsetPy = cqUnit.getPos();
int sizePy = cqUnit.getSize();
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
if (isTheBatchFull(sizePy, cqUnit.getBatchNum(), maxMsgNums, maxPullSize,
getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) {
break;
}
if (getResult.getBufferTotalSize() >= maxPullSize) {
break;
}
maxPhyOffsetPulling = offsetPy;
//Be careful, here should before the isTheBatchFull
nextBeginOffset = cqUnit.getQueueOffset() + cqUnit.getBatchNum();
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset) {
continue;
}
}
SelectMappedBufferResult selectResult = getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
// nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
nextPhyFileStartOffset = rollNextFile(offsetPy);
continue;
}
getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
} finally {
bufferConsumeQueue.release();
}
}
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
} finally {
readMessageLock.unlock();
}
}
FileList getCompactionFile() {
List<MappedFile> mappedFileList = Lists.newArrayList(getLog().getMappedFiles());
if (mappedFileList.size() < 2) {
return null;
}
List<MappedFile> toCompactFiles = mappedFileList.subList(0, mappedFileList.size() - 1);
//exclude the last writing file
List<MappedFile> newFiles = Lists.newArrayList();
for (int i = 0; i < mappedFileList.size() - 1; i++) {
MappedFile mf = mappedFileList.get(i);
long maxQueueOffsetInFile = getCQ().getMaxMsgOffsetFromFile(mf.getFile().getName());
if (maxQueueOffsetInFile > positionMgr.getOffset(topic, queueId)) {
newFiles.add(mf);
}
}
if (newFiles.isEmpty()) {
return null;
}
return new FileList(toCompactFiles, newFiles);
}
void compactAndReplace(FileList compactFiles) throws Throwable {
if (compactFiles == null || compactFiles.isEmpty()) {
return;
}
long startTime = System.nanoTime();
OffsetMap offsetMap = getOffsetMap(compactFiles.newFiles);
compaction(compactFiles.toCompactFiles, offsetMap);
replaceFiles(compactFiles.toCompactFiles, current, compacting);
positionMgr.setOffset(topic, queueId, offsetMap.lastOffset);
positionMgr.persist();
compacting.clean(false, false);
log.info("this compaction elapsed {} milliseconds",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}
void doCompaction() {
if (!state.compareAndSet(State.NORMAL, State.COMPACTING)) {
log.warn("compactionLog state is {}, skip this time", state.get());
return;
}
try {
compactAndReplace(getCompactionFile());
} catch (Throwable e) {
log.error("do compaction exception: ", e);
}
state.compareAndSet(State.COMPACTING, State.NORMAL);
}
protected OffsetMap getOffsetMap(List<MappedFile> mappedFileList) throws NoSuchAlgorithmException, DigestException {
OffsetMap offsetMap = new OffsetMap(offsetMapMemorySize);
for (MappedFile mappedFile : mappedFileList) {
Iterator<SelectMappedBufferResult> iterator = mappedFile.iterator(0);
while (iterator.hasNext()) {
SelectMappedBufferResult smb = null;
try {
smb = iterator.next();
//decode bytebuffer
MessageExt msg = MessageDecoder.decode(smb.getByteBuffer(), true, false);
if (msg != null) {
////get key & offset and put to offsetMap
if (msg.getQueueOffset() > positionMgr.getOffset(topic, queueId)) {
offsetMap.put(msg.getKeys(), msg.getQueueOffset());
}
} else {
// msg is null indicate that file is end
break;
}
} catch (DigestException e) {
log.error("offsetMap put exception: ", e);
throw e;
} finally {
if (smb != null) {
smb.release();
}
}
}
}
return offsetMap;
}
protected void putEndMessage(MappedFileQueue mappedFileQueue) {
MappedFile lastFile = mappedFileQueue.getLastMappedFile();
if (!lastFile.isFull()) {
lastFile.appendMessage(ByteBuffer.allocate(0), endMsgCallback);
}
}
protected void compaction(List<MappedFile> mappedFileList, OffsetMap offsetMap) throws DigestException {
compacting = new TopicPartitionLog(this, COMPACTING_SUB_FOLDER);
for (MappedFile mappedFile : mappedFileList) {
Iterator<SelectMappedBufferResult> iterator = mappedFile.iterator(0);
while (iterator.hasNext()) {
SelectMappedBufferResult smb = null;
try {
smb = iterator.next();
MessageExt msgExt = MessageDecoder.decode(smb.getByteBuffer(), true, true);
if (msgExt == null) {
// file end
break;
} else {
checkAndPutMessage(smb, msgExt, offsetMap, compacting);
}
} finally {
if (smb != null) {
smb.release();
}
}
}
}
putEndMessage(compacting.getLog());
}
protected void replaceFiles(List<MappedFile> mappedFileList, TopicPartitionLog current,
TopicPartitionLog newLog) {
MappedFileQueue dest = current.getLog();
MappedFileQueue src = newLog.getLog();
long beginTime = System.nanoTime();
// List<String> fileNameToReplace = mappedFileList.stream()
// .map(m -> m.getFile().getName())
// .collect(Collectors.toList());
List<String> fileNameToReplace = dest.getMappedFiles().stream()
.filter(mappedFileList::contains)
.map(mf -> mf.getFile().getName())
.collect(Collectors.toList());
mappedFileList.forEach(MappedFile::renameToDelete);
src.getMappedFiles().forEach(mappedFile -> {
try {
mappedFile.moveToParent();
} catch (IOException e) {
log.error("move file {} to parent directory exception: ", mappedFile.getFileName());
}
});
dest.getMappedFiles().stream()
.filter(m -> !mappedFileList.contains(m))
.forEach(m -> src.getMappedFiles().add(m));
readMessageLock.lock();
try {
mappedFileList.forEach(mappedFile -> mappedFile.destroy(1000));
dest.getMappedFiles().clear();
dest.getMappedFiles().addAll(src.getMappedFiles());
src.getMappedFiles().clear();
replaceCqFiles(getCQ(), newLog.getCQ(), fileNameToReplace);
log.info("replace file elapsed {} milliseconds",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime));
} finally {
readMessageLock.unlock();
}
}
protected void replaceCqFiles(SparseConsumeQueue currentBcq, SparseConsumeQueue compactionBcq,
List<String> fileNameToReplace) {
long beginTime = System.nanoTime();
MappedFileQueue currentMq = currentBcq.getMappedFileQueue();
MappedFileQueue compactMq = compactionBcq.getMappedFileQueue();
List<MappedFile> fileListToDelete = currentMq.getMappedFiles().stream().filter(m ->
fileNameToReplace.contains(m.getFile().getName())).collect(Collectors.toList());
fileListToDelete.forEach(MappedFile::renameToDelete);
compactMq.getMappedFiles().forEach(mappedFile -> {
try {
mappedFile.moveToParent();
} catch (IOException e) {
log.error("move consume queue file {} to parent directory exception: ", mappedFile.getFileName(), e);
}
});
currentMq.getMappedFiles().stream()
.filter(m -> !fileListToDelete.contains(m))
.forEach(m -> compactMq.getMappedFiles().add(m));
fileListToDelete.forEach(mappedFile -> mappedFile.destroy(1000));
currentMq.getMappedFiles().clear();
currentMq.getMappedFiles().addAll(compactMq.getMappedFiles());
compactMq.getMappedFiles().clear();
currentBcq.refresh();
log.info("replace consume queue file elapsed {} millsecs.",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime));
}
public MappedFileQueue getLog() {
return current.mappedFileQueue;
}
public SparseConsumeQueue getCQ() {
return current.consumeQueue;
}
// public SparseConsumeQueue getCompactionScq() {
// return compactionScq;
// }
public void flushCQ(int flushLeastPages) {
getCQ().flush(flushLeastPages);
}
static class CompactionAppendEndMsgCallback implements CompactionAppendMsgCallback {
@Override
public AppendMessageResult doAppend(ByteBuffer bbDest, long fileFromOffset, int maxBlank, ByteBuffer bbSrc) {
ByteBuffer endInfo = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
endInfo.putInt(maxBlank);
endInfo.putInt(BLANK_MAGIC_CODE);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE,
fileFromOffset + bbDest.position(), maxBlank, System.currentTimeMillis());
}
}
static class CompactionAppendMessageCallback implements CompactionAppendMsgCallback {
private final MessageExt msgExt;
private final SparseConsumeQueue bcq;
public CompactionAppendMessageCallback(MessageExt msgExt, SparseConsumeQueue bcq) {
this.msgExt = msgExt;
this.bcq = bcq;
}
@Override
public AppendMessageResult doAppend(ByteBuffer bbDest, long fileFromOffset, int maxBlank, ByteBuffer bbSrc) {
String topic = msgExt.getTopic();
int queueId = msgExt.getQueueId();
String tags = msgExt.getTags();
long storeTimestamp = msgExt.getStoreTimestamp();
final int msgLen = bbSrc.getInt(0);
MappedFile bcqMappedFile = bcq.getMappedFileQueue().getLastMappedFile();
if (bcqMappedFile.getWrotePosition() + BatchConsumeQueue.CQ_STORE_UNIT_SIZE >= bcqMappedFile.getFileSize()
|| (msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { //bcq will full or log will full
bcq.putEndPositionInfo(bcqMappedFile);
bbDest.putInt(maxBlank);
bbDest.putInt(BLANK_MAGIC_CODE);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE,
fileFromOffset + bbDest.position(), maxBlank, storeTimestamp);
}
//get logic offset and physical offset
int logicOffsetPos = 4 + 4 + 4 + 4 + 4;
long logicOffset = bbSrc.getLong(logicOffsetPos);
int destPos = bbDest.position();
long physicalOffset = fileFromOffset + bbDest.position();
bbSrc.rewind();
bbSrc.limit(msgLen);
bbDest.put(bbSrc);
bbDest.putLong(destPos + logicOffsetPos + 8, physicalOffset); //replace physical offset
boolean result = bcq.putBatchMessagePositionInfo(physicalOffset, msgLen,
MessageExtBrokerInner.tagsString2tagsCode(tags), storeTimestamp, logicOffset, (short)1);
if (!result) {
log.error("put message {}-{} position info failed", topic, queueId);
}
return new AppendMessageResult(AppendMessageStatus.PUT_OK, physicalOffset, msgLen, storeTimestamp);
}
}
static class OffsetMap {
private ByteBuffer dataBytes;
private int capacity;
private int entrySize;
private int entryNum;
private MessageDigest digest;
private int hashSize;
private long lastOffset;
private byte[] hash1;
private byte[] hash2;
public OffsetMap(int memorySize) throws NoSuchAlgorithmException {
this(memorySize, MessageDigest.getInstance("MD5"));
}
public OffsetMap(int memorySize, MessageDigest digest) {
this.hashSize = digest.getDigestLength();
this.entrySize = hashSize + (Long.SIZE / Byte.SIZE);
this.capacity = Math.max(memorySize / entrySize, 100);
this.dataBytes = ByteBuffer.allocate(capacity * entrySize);
this.hash1 = new byte[hashSize];
this.hash2 = new byte[hashSize];
this.entryNum = 0;
this.digest = digest;
}
public void put(String key, final long offset) throws DigestException {
if (entryNum >= capacity) {
throw new IllegalArgumentException("offset map is full");
}
hashInto(key, hash1);
int tryNum = 0;
int index = indexOf(hash1, tryNum);
while (!isEmpty(index)) {
dataBytes.position(index);
dataBytes.get(hash2);
if (Arrays.equals(hash1, hash2)) {
dataBytes.putLong(offset);
lastOffset = offset;
return;
}
tryNum++;
index = indexOf(hash1, tryNum);
}
dataBytes.position(index);
dataBytes.put(hash1);
dataBytes.putLong(offset);
lastOffset = offset;
entryNum += 1;
}
public long get(String key) throws DigestException {
hashInto(key, hash1);
int tryNum = 0;
int maxTryNum = entryNum + hashSize - 4;
int index = 0;
do {
if (tryNum >= maxTryNum) {
return -1L;
}
index = indexOf(hash1, tryNum);
dataBytes.position(index);
if (isEmpty(index)) {
return -1L;
}
dataBytes.get(hash2);
tryNum++;
} while (!Arrays.equals(hash1, hash2));
return dataBytes.getLong();
}
public long getLastOffset() {
return lastOffset;
}
private boolean isEmpty(int pos) {
return dataBytes.getLong(pos) == 0
&& dataBytes.getLong(pos + 8) == 0
&& dataBytes.getLong(pos + 16) == 0;
}
private int indexOf(byte[] hash, int tryNum) {
int index = readInt(hash, Math.min(tryNum, hashSize - 4)) + Math.max(0, tryNum - hashSize + 4);
int entry = Math.abs(index) % capacity;
return entry * entrySize;
}
private void hashInto(String key, byte[] buf) throws DigestException {
digest.update(key.getBytes(StandardCharsets.UTF_8));
digest.digest(buf, 0, hashSize);
}
private int readInt(byte[] buf, int offset) {
return ((buf[offset] & 0xFF) << 24) |
((buf[offset + 1] & 0xFF) << 16) |
((buf[offset + 2] & 0xFF) << 8) |
((buf[offset + 3] & 0xFF));
}
}
static class TopicPartitionLog {
MappedFileQueue mappedFileQueue;
SparseConsumeQueue consumeQueue;
public TopicPartitionLog(CompactionLog compactionLog) {
this(compactionLog, null);
}
public TopicPartitionLog(CompactionLog compactionLog, String subFolder) {
if (StringUtils.isBlank(subFolder)) {
mappedFileQueue = new MappedFileQueue(compactionLog.compactionLogFilePath,
compactionLog.compactionLogMappedFileSize, null);
consumeQueue = new SparseConsumeQueue(compactionLog.topic, compactionLog.queueId,
compactionLog.compactionCqFilePath, compactionLog.compactionCqMappedFileSize,
compactionLog.defaultMessageStore);
} else {
mappedFileQueue = new MappedFileQueue(compactionLog.compactionLogFilePath + File.separator + subFolder,
compactionLog.compactionLogMappedFileSize, null);
consumeQueue = new SparseConsumeQueue(compactionLog.topic, compactionLog.queueId,
compactionLog.compactionCqFilePath, compactionLog.compactionCqMappedFileSize,
compactionLog.defaultMessageStore, subFolder);
}
}
public void shutdown() {
mappedFileQueue.shutdown(1000 * 30);
consumeQueue.getMappedFileQueue().shutdown(1000 * 30);
}
public void init(boolean exitOk) throws IOException, RuntimeException {
if (!mappedFileQueue.load()) {
shutdown();
throw new IOException("load log exception");
}
if (!consumeQueue.load()) {
shutdown();
throw new IOException("load consume queue exception");
}
try {
consumeQueue.recover();
recover();
sanityCheck();
} catch (Exception e) {
shutdown();
throw e;
}
}
private void recover() {
long maxCqPhysicOffset = consumeQueue.getMaxPhyOffsetInLog();
log.info("{}:{} max physical offset in compaction log is {}",
consumeQueue.getTopic(), consumeQueue.getQueueId(), maxCqPhysicOffset);
if (maxCqPhysicOffset > 0) {
this.mappedFileQueue.setFlushedWhere(maxCqPhysicOffset);
this.mappedFileQueue.setCommittedWhere(maxCqPhysicOffset);
this.mappedFileQueue.truncateDirtyFiles(maxCqPhysicOffset);
}
}
void sanityCheck() throws RuntimeException {
List<MappedFile> mappedFileList = mappedFileQueue.getMappedFiles();
for (MappedFile file : mappedFileList) {
if (!consumeQueue.containsOffsetFile(Long.parseLong(file.getFile().getName()))) {
throw new RuntimeException("log file mismatch with consumeQueue file " + file.getFileName());
}
}
List<MappedFile> cqMappedFileList = consumeQueue.getMappedFileQueue().getMappedFiles();
for (MappedFile file: cqMappedFileList) {
if (mappedFileList.stream().noneMatch(m -> Objects.equals(m.getFile().getName(), file.getFile().getName()))) {
throw new RuntimeException("consumeQueue file mismatch with log file " + file.getFileName());
}
}
}
public synchronized void roll() throws IOException {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
if (mappedFile == null) {
throw new IOException("create new file error");
}
long baseOffset = mappedFile.getFileFromOffset();
MappedFile cqFile = consumeQueue.createFile(baseOffset);
if (cqFile == null) {
mappedFile.destroy(1000);
mappedFileQueue.getMappedFiles().remove(mappedFile);
throw new IOException("create new consumeQueue file error");
}
}
public synchronized void roll(int baseOffset) throws IOException {
MappedFile mappedFile = mappedFileQueue.tryCreateMappedFile(baseOffset);
if (mappedFile == null) {
throw new IOException("create new file error");
}
MappedFile cqFile = consumeQueue.createFile(baseOffset);
if (cqFile == null) {
mappedFile.destroy(1000);
mappedFileQueue.getMappedFiles().remove(mappedFile);
throw new IOException("create new consumeQueue file error");
}
}
public boolean isEmptyOrCurrentFileFull() {
return mappedFileQueue.isEmptyOrCurrentFileFull() ||
consumeQueue.getMappedFileQueue().isEmptyOrCurrentFileFull();
}
public void clean(MappedFileQueue mappedFileQueue) throws IOException {
for (MappedFile mf : mappedFileQueue.getMappedFiles()) {
if (mf.getFile().exists()) {
log.error("directory {} with {} not empty.", mappedFileQueue.getStorePath(), mf.getFileName());
throw new IOException("directory " + mappedFileQueue.getStorePath() + " not empty.");
}
}
mappedFileQueue.destroy();
}
public void clean(boolean forceCleanLog, boolean forceCleanCq) throws IOException {
//clean and delete sub_folder
if (forceCleanLog) {
mappedFileQueue.destroy();
} else {
clean(mappedFileQueue);
}
if (forceCleanCq) {
consumeQueue.getMappedFileQueue().destroy();
} else {
clean(consumeQueue.getMappedFileQueue());
}
}
public MappedFileQueue getLog() {
return mappedFileQueue;
}
public SparseConsumeQueue getCQ() {
return consumeQueue;
}
}
static enum State {
NORMAL,
INITIALIZING,
COMPACTING,
}
static class FileList {
List<MappedFile> newFiles;
List<MappedFile> toCompactFiles;
public FileList(List<MappedFile> toCompactFiles, List<MappedFile> newFiles) {
this.toCompactFiles = toCompactFiles;
this.newFiles = newFiles;
}
boolean isEmpty() {
return CollectionUtils.isEmpty(newFiles) || CollectionUtils.isEmpty(toCompactFiles);
}
}
}