| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.rocketmq.store.dledger; |
| |
| import io.openmessaging.storage.dledger.AppendFuture; |
| import io.openmessaging.storage.dledger.BatchAppendFuture; |
| import io.openmessaging.storage.dledger.DLedgerConfig; |
| import io.openmessaging.storage.dledger.DLedgerServer; |
| import io.openmessaging.storage.dledger.entry.DLedgerEntry; |
| import io.openmessaging.storage.dledger.protocol.AppendEntryRequest; |
| import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; |
| import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest; |
| import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; |
| import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; |
| import io.openmessaging.storage.dledger.store.file.MmapFile; |
| import io.openmessaging.storage.dledger.store.file.MmapFileList; |
| import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; |
| import io.openmessaging.storage.dledger.utils.DLedgerUtils; |
| import java.net.Inet6Address; |
| import java.net.InetSocketAddress; |
| import java.nio.ByteBuffer; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.rocketmq.common.UtilAll; |
| import org.apache.rocketmq.common.message.MessageAccessor; |
| import org.apache.rocketmq.common.message.MessageConst; |
| import org.apache.rocketmq.common.message.MessageDecoder; |
| import org.apache.rocketmq.common.message.MessageExtBatch; |
| import org.apache.rocketmq.common.sysflag.MessageSysFlag; |
| import org.apache.rocketmq.common.topic.TopicValidator; |
| import org.apache.rocketmq.store.AppendMessageResult; |
| import org.apache.rocketmq.store.AppendMessageStatus; |
| import org.apache.rocketmq.store.CommitLog; |
| import org.apache.rocketmq.store.DefaultMessageStore; |
| import org.apache.rocketmq.store.DispatchRequest; |
| import org.apache.rocketmq.store.MappedFile; |
| import org.apache.rocketmq.store.MessageExtBrokerInner; |
| import org.apache.rocketmq.store.PutMessageResult; |
| import org.apache.rocketmq.store.PutMessageStatus; |
| import org.apache.rocketmq.store.SelectMappedBufferResult; |
| import org.apache.rocketmq.store.StoreStatsService; |
| import org.apache.rocketmq.store.schedule.ScheduleMessageService; |
| |
| /** |
| * Store all metadata downtime for recovery, data protection reliability |
| */ |
| public class DLedgerCommitLog extends CommitLog { |
| private final DLedgerServer dLedgerServer; |
| private final DLedgerConfig dLedgerConfig; |
| private final DLedgerMmapFileStore dLedgerFileStore; |
| private final MmapFileList dLedgerFileList; |
| |
| //The id identifies the broker role, 0 means master, others means slave |
| private final int id; |
| |
| private final MessageSerializer messageSerializer; |
| private volatile long beginTimeInDledgerLock = 0; |
| |
| //This offset separate the old commitlog from dledger commitlog |
| private long dividedCommitlogOffset = -1; |
| |
| private boolean isInrecoveringOldCommitlog = false; |
| |
| private final StringBuilder msgIdBuilder = new StringBuilder(); |
| |
| public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { |
| super(defaultMessageStore); |
| dLedgerConfig = new DLedgerConfig(); |
| dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); |
| dLedgerConfig.setStoreType(DLedgerConfig.FILE); |
| dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId()); |
| dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); |
| dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); |
| dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); |
| dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()); |
| dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); |
| dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); |
| dLedgerConfig.setPreferredLeaderId(defaultMessageStore.getMessageStoreConfig().getPreferredLeaderId()); |
| dLedgerConfig.setEnableBatchPush(defaultMessageStore.getMessageStoreConfig().isEnableBatchPush()); |
| |
| id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; |
| dLedgerServer = new DLedgerServer(dLedgerConfig); |
| dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); |
| DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { |
| assert bodyOffset == DLedgerEntry.BODY_OFFSET; |
| buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION); |
| buffer.putLong(entry.getPos() + bodyOffset); |
| }; |
| dLedgerFileStore.addAppendHook(appendHook); |
| dLedgerFileList = dLedgerFileStore.getDataFileList(); |
| this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); |
| |
| } |
| |
| @Override |
| public boolean load() { |
| return super.load(); |
| } |
| |
| private void refreshConfig() { |
| dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); |
| dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); |
| dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); |
| } |
| |
| private void disableDeleteDledger() { |
| dLedgerConfig.setEnableDiskForceClean(false); |
| dLedgerConfig.setFileReservedHours(24 * 365 * 10); |
| } |
| |
| @Override |
| public void start() { |
| dLedgerServer.startup(); |
| } |
| |
| @Override |
| public void shutdown() { |
| dLedgerServer.shutdown(); |
| } |
| |
| @Override |
| public long flush() { |
| dLedgerFileStore.flush(); |
| return dLedgerFileList.getFlushedWhere(); |
| } |
| |
| @Override |
| public long getMaxOffset() { |
| if (dLedgerFileStore.getCommittedPos() > 0) { |
| return dLedgerFileStore.getCommittedPos(); |
| } |
| if (dLedgerFileList.getMinOffset() > 0) { |
| return dLedgerFileList.getMinOffset(); |
| } |
| return 0; |
| } |
| |
| @Override |
| public long getMinOffset() { |
| if (!mappedFileQueue.getMappedFiles().isEmpty()) { |
| return mappedFileQueue.getMinOffset(); |
| } |
| return dLedgerFileList.getMinOffset(); |
| } |
| |
| @Override |
| public long getConfirmOffset() { |
| return this.getMaxOffset(); |
| } |
| |
| @Override |
| public void setConfirmOffset(long phyOffset) { |
| log.warn("Should not set confirm offset {} for dleger commitlog", phyOffset); |
| } |
| |
| @Override |
| public long remainHowManyDataToCommit() { |
| return dLedgerFileList.remainHowManyDataToCommit(); |
| } |
| |
| @Override |
| public long remainHowManyDataToFlush() { |
| return dLedgerFileList.remainHowManyDataToFlush(); |
| } |
| |
| @Override |
| public int deleteExpiredFile( |
| final long expiredTime, |
| final int deleteFilesInterval, |
| final long intervalForcibly, |
| final boolean cleanImmediately |
| ) { |
| if (mappedFileQueue.getMappedFiles().isEmpty()) { |
| refreshConfig(); |
| //To prevent too much log in defaultMessageStore |
| return Integer.MAX_VALUE; |
| } else { |
| disableDeleteDledger(); |
| } |
| int count = super.deleteExpiredFile(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately); |
| if (count > 0 || mappedFileQueue.getMappedFiles().size() != 1) { |
| return count; |
| } |
| //the old logic will keep the last file, here to delete it |
| MappedFile mappedFile = mappedFileQueue.getLastMappedFile(); |
| log.info("Try to delete the last old commitlog file {}", mappedFile.getFileName()); |
| long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; |
| if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { |
| while (!mappedFile.destroy(10 * 1000)) { |
| DLedgerUtils.sleep(1000); |
| } |
| mappedFileQueue.getMappedFiles().remove(mappedFile); |
| } |
| return 1; |
| } |
| |
| public SelectMappedBufferResult convertSbr(SelectMmapBufferResult sbr) { |
| if (sbr == null) { |
| return null; |
| } else { |
| return new DLedgerSelectMappedBufferResult(sbr); |
| } |
| |
| } |
| |
| public SelectMmapBufferResult truncate(SelectMmapBufferResult sbr) { |
| long committedPos = dLedgerFileStore.getCommittedPos(); |
| if (sbr == null || sbr.getStartOffset() == committedPos) { |
| return null; |
| } |
| if (sbr.getStartOffset() + sbr.getSize() <= committedPos) { |
| return sbr; |
| } else { |
| sbr.setSize((int) (committedPos - sbr.getStartOffset())); |
| return sbr; |
| } |
| } |
| |
| @Override |
| public SelectMappedBufferResult getData(final long offset) { |
| if (offset < dividedCommitlogOffset) { |
| return super.getData(offset); |
| } |
| return this.getData(offset, offset == 0); |
| } |
| |
| @Override |
| public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { |
| if (offset < dividedCommitlogOffset) { |
| return super.getData(offset, returnFirstOnNotFound); |
| } |
| if (offset >= dLedgerFileStore.getCommittedPos()) { |
| return null; |
| } |
| int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); |
| MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, returnFirstOnNotFound); |
| if (mappedFile != null) { |
| int pos = (int) (offset % mappedFileSize); |
| SelectMmapBufferResult sbr = mappedFile.selectMappedBuffer(pos); |
| return convertSbr(truncate(sbr)); |
| } |
| |
| return null; |
| } |
| |
| private void recover(long maxPhyOffsetOfConsumeQueue) { |
| dLedgerFileStore.load(); |
| if (dLedgerFileList.getMappedFiles().size() > 0) { |
| dLedgerFileStore.recover(); |
| dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset(); |
| MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); |
| if (mappedFile != null) { |
| disableDeleteDledger(); |
| } |
| long maxPhyOffset = dLedgerFileList.getMaxWrotePosition(); |
| // Clear ConsumeQueue redundant data |
| if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) { |
| log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset); |
| this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset); |
| } |
| return; |
| } |
| //Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog |
| isInrecoveringOldCommitlog = true; |
| //No need the abnormal recover |
| super.recoverNormally(maxPhyOffsetOfConsumeQueue); |
| isInrecoveringOldCommitlog = false; |
| MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); |
| if (mappedFile == null) { |
| return; |
| } |
| ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); |
| byteBuffer.position(mappedFile.getWrotePosition()); |
| boolean needWriteMagicCode = true; |
| // 1 TOTAL SIZE |
| byteBuffer.getInt(); //size |
| int magicCode = byteBuffer.getInt(); |
| if (magicCode == CommitLog.BLANK_MAGIC_CODE) { |
| needWriteMagicCode = false; |
| } else { |
| log.info("Recover old commitlog found a illegal magic code={}", magicCode); |
| } |
| dLedgerConfig.setEnableDiskForceClean(false); |
| dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); |
| log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset); |
| if (needWriteMagicCode) { |
| byteBuffer.position(mappedFile.getWrotePosition()); |
| byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition()); |
| byteBuffer.putInt(BLANK_MAGIC_CODE); |
| mappedFile.flush(0); |
| } |
| mappedFile.setWrotePosition(mappedFile.getFileSize()); |
| mappedFile.setCommittedPosition(mappedFile.getFileSize()); |
| mappedFile.setFlushedPosition(mappedFile.getFileSize()); |
| dLedgerFileList.getLastMappedFile(dividedCommitlogOffset); |
| log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset); |
| } |
| |
| @Override |
| public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { |
| recover(maxPhyOffsetOfConsumeQueue); |
| } |
| |
| @Override |
| public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { |
| recover(maxPhyOffsetOfConsumeQueue); |
| } |
| |
| @Override |
| public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC) { |
| return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true); |
| } |
| |
| @Override |
| public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, |
| final boolean readBody) { |
| if (isInrecoveringOldCommitlog) { |
| return super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody); |
| } |
| try { |
| int bodyOffset = DLedgerEntry.BODY_OFFSET; |
| int pos = byteBuffer.position(); |
| int magic = byteBuffer.getInt(); |
| //In dledger, this field is size, it must be gt 0, so it could prevent collision |
| int magicOld = byteBuffer.getInt(); |
| if (magicOld == CommitLog.BLANK_MAGIC_CODE || magicOld == CommitLog.MESSAGE_MAGIC_CODE) { |
| byteBuffer.position(pos); |
| return super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody); |
| } |
| if (magic == MmapFileList.BLANK_MAGIC_CODE) { |
| return new DispatchRequest(0, true); |
| } |
| byteBuffer.position(pos + bodyOffset); |
| DispatchRequest dispatchRequest = super.checkMessageAndReturnSize(byteBuffer, checkCRC, readBody); |
| if (dispatchRequest.isSuccess()) { |
| dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset); |
| } else if (dispatchRequest.getMsgSize() > 0) { |
| dispatchRequest.setBufferSize(dispatchRequest.getMsgSize() + bodyOffset); |
| } |
| return dispatchRequest; |
| } catch (Throwable ignored) { |
| } |
| |
| return new DispatchRequest(-1, false /* success */); |
| } |
| |
| @Override |
| public boolean resetOffset(long offset) { |
| //currently, it seems resetOffset has no use |
| return false; |
| } |
| |
| @Override |
| public long getBeginTimeInLock() { |
| return beginTimeInDledgerLock; |
| } |
| |
| private void setMessageInfo(MessageExtBrokerInner msg, int tranType) { |
| // Set the storage time |
| msg.setStoreTimestamp(System.currentTimeMillis()); |
| // Set the message body BODY CRC (consider the most appropriate setting |
| // on the client) |
| msg.setBodyCRC(UtilAll.crc32(msg.getBody())); |
| |
| //should be consistent with the old version |
| if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE |
| || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { |
| // Delay Delivery |
| if (msg.getDelayTimeLevel() > 0) { |
| if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { |
| msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); |
| } |
| |
| |
| String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; |
| int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); |
| |
| // Backup real topic, queueId |
| MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); |
| MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); |
| msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); |
| |
| msg.setTopic(topic); |
| msg.setQueueId(queueId); |
| } |
| } |
| |
| InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost(); |
| if (bornSocketAddress.getAddress() instanceof Inet6Address) { |
| msg.setBornHostV6Flag(); |
| } |
| |
| InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost(); |
| if (storeSocketAddress.getAddress() instanceof Inet6Address) { |
| msg.setStoreHostAddressV6Flag(); |
| } |
| } |
| |
| @Override |
| public PutMessageResult putMessage(final MessageExtBrokerInner msg) { |
| |
| StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); |
| final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); |
| String topic = msg.getTopic(); |
| setMessageInfo(msg,tranType); |
| |
| // Back to Results |
| AppendMessageResult appendResult; |
| AppendFuture<AppendEntryResponse> dledgerFuture; |
| EncodeResult encodeResult; |
| |
| putMessageLock.lock(); //spin or ReentrantLock ,depending on store config |
| long elapsedTimeInLock; |
| long queueOffset; |
| try { |
| beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); |
| encodeResult = this.messageSerializer.serialize(msg); |
| queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); |
| encodeResult.setQueueOffsetKey(queueOffset); |
| if (encodeResult.status != AppendMessageStatus.PUT_OK) { |
| return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)); |
| } |
| AppendEntryRequest request = new AppendEntryRequest(); |
| request.setGroup(dLedgerConfig.getGroup()); |
| request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); |
| request.setBody(encodeResult.getData()); |
| dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request); |
| if (dledgerFuture.getPos() == -1) { |
| return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); |
| } |
| long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; |
| |
| int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; |
| ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); |
| |
| String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); |
| elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; |
| appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); |
| switch (tranType) { |
| case MessageSysFlag.TRANSACTION_PREPARED_TYPE: |
| case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: |
| break; |
| case MessageSysFlag.TRANSACTION_NOT_TYPE: |
| case MessageSysFlag.TRANSACTION_COMMIT_TYPE: |
| // The next update ConsumeQueue information |
| DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1); |
| break; |
| default: |
| break; |
| } |
| } catch (Exception e) { |
| log.error("Put message error", e); |
| return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); |
| } finally { |
| beginTimeInDledgerLock = 0; |
| putMessageLock.unlock(); |
| } |
| |
| if (elapsedTimeInLock > 500) { |
| log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult); |
| } |
| |
| PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; |
| try { |
| AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS); |
| switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { |
| case SUCCESS: |
| putMessageStatus = PutMessageStatus.PUT_OK; |
| break; |
| case INCONSISTENT_LEADER: |
| case NOT_LEADER: |
| case LEADER_NOT_READY: |
| case DISK_FULL: |
| putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; |
| break; |
| case WAIT_QUORUM_ACK_TIMEOUT: |
| //Do not return flush_slave_timeout to the client, for the ons client will ignore it. |
| putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; |
| break; |
| case LEADER_PENDING_FULL: |
| putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; |
| break; |
| } |
| } catch (Throwable t) { |
| log.error("Failed to get dledger append result", t); |
| } |
| |
| PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); |
| if (putMessageStatus == PutMessageStatus.PUT_OK) { |
| // Statistics |
| storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); |
| storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(appendResult.getWroteBytes()); |
| } |
| return putMessageResult; |
| } |
| |
| @Override |
| public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { |
| final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); |
| |
| if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { |
| return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); |
| } |
| if (messageExtBatch.getDelayTimeLevel() > 0) { |
| return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); |
| } |
| |
| // Set the storage time |
| messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); |
| |
| StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); |
| |
| InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost(); |
| if (bornSocketAddress.getAddress() instanceof Inet6Address) { |
| messageExtBatch.setBornHostV6Flag(); |
| } |
| |
| InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost(); |
| if (storeSocketAddress.getAddress() instanceof Inet6Address) { |
| messageExtBatch.setStoreHostAddressV6Flag(); |
| } |
| |
| // Back to Results |
| AppendMessageResult appendResult; |
| BatchAppendFuture<AppendEntryResponse> dledgerFuture; |
| EncodeResult encodeResult; |
| |
| putMessageLock.lock(); //spin or ReentrantLock ,depending on store config |
| msgIdBuilder.setLength(0); |
| long elapsedTimeInLock; |
| long queueOffset; |
| long msgNum = 0; |
| try { |
| beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); |
| encodeResult = this.messageSerializer.serialize(messageExtBatch); |
| queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); |
| if (encodeResult.status != AppendMessageStatus.PUT_OK) { |
| return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult |
| .status)); |
| } |
| BatchAppendEntryRequest request = new BatchAppendEntryRequest(); |
| request.setGroup(dLedgerConfig.getGroup()); |
| request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); |
| request.setBatchMsgs(encodeResult.batchData); |
| dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request); |
| if (dledgerFuture.getPos() == -1) { |
| log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode()); |
| return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); |
| } |
| long wroteOffset = 0; |
| |
| int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; |
| ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); |
| |
| boolean isFirstOffset = true; |
| long firstWroteOffset = 0; |
| for (long pos : dledgerFuture.getPositions()) { |
| wroteOffset = pos + DLedgerEntry.BODY_OFFSET; |
| if (isFirstOffset) { |
| firstWroteOffset = wroteOffset; |
| isFirstOffset = false; |
| } |
| String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset); |
| if (msgIdBuilder.length() > 0) { |
| msgIdBuilder.append(',').append(msgId); |
| } else { |
| msgIdBuilder.append(msgId); |
| } |
| msgNum++; |
| } |
| |
| elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; |
| appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen, |
| msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock); |
| DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum); |
| } catch (Exception e) { |
| log.error("Put message error", e); |
| return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus |
| .UNKNOWN_ERROR)); |
| } finally { |
| beginTimeInDledgerLock = 0; |
| putMessageLock.unlock(); |
| } |
| |
| if (elapsedTimeInLock > 500) { |
| log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", |
| elapsedTimeInLock, messageExtBatch.getBody().length, appendResult); |
| } |
| |
| PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; |
| try { |
| AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS); |
| switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { |
| case SUCCESS: |
| putMessageStatus = PutMessageStatus.PUT_OK; |
| break; |
| case INCONSISTENT_LEADER: |
| case NOT_LEADER: |
| case LEADER_NOT_READY: |
| case DISK_FULL: |
| putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; |
| break; |
| case WAIT_QUORUM_ACK_TIMEOUT: |
| //Do not return flush_slave_timeout to the client, for the ons client will ignore it. |
| putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; |
| break; |
| case LEADER_PENDING_FULL: |
| putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; |
| break; |
| } |
| } catch (Throwable t) { |
| log.error("Failed to get dledger append result", t); |
| } |
| |
| PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); |
| if (putMessageStatus == PutMessageStatus.PUT_OK) { |
| // Statistics |
| storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(msgNum); |
| storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(encodeResult.totalMsgLen); |
| } |
| return putMessageResult; |
| } |
| |
| @Override |
| public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) { |
| |
| StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); |
| |
| final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); |
| |
| setMessageInfo(msg, tranType); |
| |
| final String finalTopic = msg.getTopic(); |
| |
| // Back to Results |
| AppendMessageResult appendResult; |
| AppendFuture<AppendEntryResponse> dledgerFuture; |
| EncodeResult encodeResult; |
| |
| encodeResult = this.messageSerializer.serialize(msg); |
| if (encodeResult.status != AppendMessageStatus.PUT_OK) { |
| return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status))); |
| } |
| putMessageLock.lock(); //spin or ReentrantLock ,depending on store config |
| long elapsedTimeInLock; |
| long queueOffset; |
| try { |
| beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); |
| queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); |
| encodeResult.setQueueOffsetKey(queueOffset); |
| AppendEntryRequest request = new AppendEntryRequest(); |
| request.setGroup(dLedgerConfig.getGroup()); |
| request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); |
| request.setBody(encodeResult.getData()); |
| dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request); |
| if (dledgerFuture.getPos() == -1) { |
| return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); |
| } |
| long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; |
| |
| int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; |
| ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); |
| |
| String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); |
| elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; |
| appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); |
| switch (tranType) { |
| case MessageSysFlag.TRANSACTION_PREPARED_TYPE: |
| case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: |
| break; |
| case MessageSysFlag.TRANSACTION_NOT_TYPE: |
| case MessageSysFlag.TRANSACTION_COMMIT_TYPE: |
| // The next update ConsumeQueue information |
| DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1); |
| break; |
| default: |
| break; |
| } |
| } catch (Exception e) { |
| log.error("Put message error", e); |
| return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); |
| } finally { |
| beginTimeInDledgerLock = 0; |
| putMessageLock.unlock(); |
| } |
| |
| if (elapsedTimeInLock > 500) { |
| log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult); |
| } |
| |
| return dledgerFuture.thenApply(appendEntryResponse -> { |
| PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; |
| switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { |
| case SUCCESS: |
| putMessageStatus = PutMessageStatus.PUT_OK; |
| break; |
| case INCONSISTENT_LEADER: |
| case NOT_LEADER: |
| case LEADER_NOT_READY: |
| case DISK_FULL: |
| putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; |
| break; |
| case WAIT_QUORUM_ACK_TIMEOUT: |
| //Do not return flush_slave_timeout to the client, for the ons client will ignore it. |
| putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; |
| break; |
| case LEADER_PENDING_FULL: |
| putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; |
| break; |
| } |
| PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); |
| if (putMessageStatus == PutMessageStatus.PUT_OK) { |
| // Statistics |
| storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).incrementAndGet(); |
| storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).addAndGet(appendResult.getWroteBytes()); |
| } |
| return putMessageResult; |
| }); |
| } |
| |
| @Override |
| public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) { |
| final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); |
| |
| if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { |
| return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); |
| } |
| if (messageExtBatch.getDelayTimeLevel() > 0) { |
| return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); |
| } |
| |
| // Set the storage time |
| messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); |
| |
| StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); |
| |
| InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost(); |
| if (bornSocketAddress.getAddress() instanceof Inet6Address) { |
| messageExtBatch.setBornHostV6Flag(); |
| } |
| |
| InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost(); |
| if (storeSocketAddress.getAddress() instanceof Inet6Address) { |
| messageExtBatch.setStoreHostAddressV6Flag(); |
| } |
| |
| // Back to Results |
| AppendMessageResult appendResult; |
| BatchAppendFuture<AppendEntryResponse> dledgerFuture; |
| EncodeResult encodeResult; |
| |
| encodeResult = this.messageSerializer.serialize(messageExtBatch); |
| if (encodeResult.status != AppendMessageStatus.PUT_OK) { |
| return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult |
| .status))); |
| } |
| |
| putMessageLock.lock(); //spin or ReentrantLock ,depending on store config |
| msgIdBuilder.setLength(0); |
| long elapsedTimeInLock; |
| long queueOffset; |
| long msgNum = 0; |
| try { |
| beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); |
| queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey); |
| BatchAppendEntryRequest request = new BatchAppendEntryRequest(); |
| request.setGroup(dLedgerConfig.getGroup()); |
| request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); |
| request.setBatchMsgs(encodeResult.batchData); |
| dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request); |
| if (dledgerFuture.getPos() == -1) { |
| log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode()); |
| return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); |
| } |
| long wroteOffset = 0; |
| |
| int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; |
| ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); |
| |
| boolean isFirstOffset = true; |
| long firstWroteOffset = 0; |
| for (long pos : dledgerFuture.getPositions()) { |
| wroteOffset = pos + DLedgerEntry.BODY_OFFSET; |
| if (isFirstOffset) { |
| firstWroteOffset = wroteOffset; |
| isFirstOffset = false; |
| } |
| String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset); |
| if (msgIdBuilder.length() > 0) { |
| msgIdBuilder.append(',').append(msgId); |
| } else { |
| msgIdBuilder.append(msgId); |
| } |
| msgNum++; |
| } |
| |
| elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; |
| appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen, |
| msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock); |
| DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum); |
| } catch (Exception e) { |
| log.error("Put message error", e); |
| return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); |
| } finally { |
| beginTimeInDledgerLock = 0; |
| putMessageLock.unlock(); |
| } |
| |
| if (elapsedTimeInLock > 500) { |
| log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", |
| elapsedTimeInLock, messageExtBatch.getBody().length, appendResult); |
| } |
| |
| return dledgerFuture.thenApply(appendEntryResponse -> { |
| PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; |
| switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { |
| case SUCCESS: |
| putMessageStatus = PutMessageStatus.PUT_OK; |
| break; |
| case INCONSISTENT_LEADER: |
| case NOT_LEADER: |
| case LEADER_NOT_READY: |
| case DISK_FULL: |
| putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; |
| break; |
| case WAIT_QUORUM_ACK_TIMEOUT: |
| //Do not return flush_slave_timeout to the client, for the ons client will ignore it. |
| putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; |
| break; |
| case LEADER_PENDING_FULL: |
| putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; |
| break; |
| } |
| PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); |
| if (putMessageStatus == PutMessageStatus.PUT_OK) { |
| // Statistics |
| storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).incrementAndGet(); |
| storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(appendResult.getWroteBytes()); |
| } |
| return putMessageResult; |
| }); |
| } |
| |
| @Override |
| public SelectMappedBufferResult getMessage(final long offset, final int size) { |
| if (offset < dividedCommitlogOffset) { |
| return super.getMessage(offset, size); |
| } |
| int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); |
| MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); |
| if (mappedFile != null) { |
| int pos = (int) (offset % mappedFileSize); |
| return convertSbr(mappedFile.selectMappedBuffer(pos, size)); |
| } |
| return null; |
| } |
| |
| @Override |
| public long rollNextFile(final long offset) { |
| int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); |
| return offset + mappedFileSize - offset % mappedFileSize; |
| } |
| |
| @Override |
| public HashMap<String, Long> getTopicQueueTable() { |
| return topicQueueTable; |
| } |
| |
| @Override |
| public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) { |
| this.topicQueueTable = topicQueueTable; |
| } |
| |
| @Override |
| public void destroy() { |
| super.destroy(); |
| dLedgerFileList.destroy(); |
| } |
| |
| @Override |
| public boolean appendData(long startOffset, byte[] data) { |
| //the old ha service will invoke method, here to prevent it |
| return false; |
| } |
| |
| @Override |
| public void checkSelf() { |
| dLedgerFileList.checkSelf(); |
| } |
| |
| @Override |
| public long lockTimeMills() { |
| long diff = 0; |
| long begin = this.beginTimeInDledgerLock; |
| if (begin > 0) { |
| diff = this.defaultMessageStore.now() - begin; |
| } |
| |
| if (diff < 0) { |
| diff = 0; |
| } |
| |
| return diff; |
| } |
| |
| private long getQueueOffsetByKey(String key, int tranType) { |
| Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key); |
| if (null == queueOffset) { |
| queueOffset = 0L; |
| DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset); |
| } |
| |
| // Transaction messages that require special handling |
| switch (tranType) { |
| // Prepared and Rollback message is not consumed, will not enter the |
| // consumer queuec |
| case MessageSysFlag.TRANSACTION_PREPARED_TYPE: |
| case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: |
| queueOffset = 0L; |
| break; |
| case MessageSysFlag.TRANSACTION_NOT_TYPE: |
| case MessageSysFlag.TRANSACTION_COMMIT_TYPE: |
| default: |
| break; |
| } |
| return queueOffset; |
| } |
| |
| |
| class EncodeResult { |
| private String queueOffsetKey; |
| private ByteBuffer data; |
| private List<byte[]> batchData; |
| private AppendMessageStatus status; |
| private int totalMsgLen; |
| |
| public EncodeResult(AppendMessageStatus status, ByteBuffer data, String queueOffsetKey) { |
| this.data = data; |
| this.status = status; |
| this.queueOffsetKey = queueOffsetKey; |
| } |
| |
| public void setQueueOffsetKey(long offset) { |
| data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset); |
| } |
| |
| public byte[] getData() { |
| return data.array(); |
| } |
| |
| public EncodeResult(AppendMessageStatus status, String queueOffsetKey, List<byte[]> batchData, int totalMsgLen) { |
| this.batchData = batchData; |
| this.status = status; |
| this.queueOffsetKey = queueOffsetKey; |
| this.totalMsgLen = totalMsgLen; |
| } |
| } |
| |
| class MessageSerializer { |
| |
| // The maximum length of the message |
| private final int maxMessageSize; |
| // Build Message Key |
| private final StringBuilder keyBuilder = new StringBuilder(); |
| |
| MessageSerializer(final int size) { |
| this.maxMessageSize = size; |
| } |
| |
| public EncodeResult serialize(final MessageExtBrokerInner msgInner) { |
| // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> |
| |
| // PHY OFFSET |
| long wroteOffset = 0; |
| |
| long queueOffset = 0; |
| |
| int sysflag = msgInner.getSysFlag(); |
| |
| int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; |
| int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; |
| ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength); |
| ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); |
| |
| String key = msgInner.getTopic() + "-" + msgInner.getQueueId(); |
| |
| /** |
| * Serialize message |
| */ |
| final byte[] propertiesData = |
| msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); |
| |
| final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; |
| |
| if (propertiesLength > Short.MAX_VALUE) { |
| log.warn("putMessage message properties length too long. length={}", propertiesData.length); |
| return new EncodeResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED, null, key); |
| } |
| |
| final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); |
| final int topicLength = topicData.length; |
| |
| final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; |
| |
| final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength); |
| |
| ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen); |
| |
| // Exceeds the maximum message |
| if (msgLen > this.maxMessageSize) { |
| DLedgerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength |
| + ", maxMessageSize: " + this.maxMessageSize); |
| return new EncodeResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, null, key); |
| } |
| // Initialization of storage space |
| this.resetByteBuffer(msgStoreItemMemory, msgLen); |
| // 1 TOTALSIZE |
| msgStoreItemMemory.putInt(msgLen); |
| // 2 MAGICCODE |
| msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE); |
| // 3 BODYCRC |
| msgStoreItemMemory.putInt(msgInner.getBodyCRC()); |
| // 4 QUEUEID |
| msgStoreItemMemory.putInt(msgInner.getQueueId()); |
| // 5 FLAG |
| msgStoreItemMemory.putInt(msgInner.getFlag()); |
| // 6 QUEUEOFFSET |
| msgStoreItemMemory.putLong(queueOffset); |
| // 7 PHYSICALOFFSET |
| msgStoreItemMemory.putLong(wroteOffset); |
| // 8 SYSFLAG |
| msgStoreItemMemory.putInt(msgInner.getSysFlag()); |
| // 9 BORNTIMESTAMP |
| msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); |
| // 10 BORNHOST |
| resetByteBuffer(bornHostHolder, bornHostLength); |
| msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder)); |
| // 11 STORETIMESTAMP |
| msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); |
| // 12 STOREHOSTADDRESS |
| resetByteBuffer(storeHostHolder, storeHostLength); |
| msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder)); |
| //this.msgBatchMemory.put(msgInner.getStoreHostBytes()); |
| // 13 RECONSUMETIMES |
| msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); |
| // 14 Prepared Transaction Offset |
| msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); |
| // 15 BODY |
| msgStoreItemMemory.putInt(bodyLength); |
| if (bodyLength > 0) { |
| msgStoreItemMemory.put(msgInner.getBody()); |
| } |
| // 16 TOPIC |
| msgStoreItemMemory.put((byte) topicLength); |
| msgStoreItemMemory.put(topicData); |
| // 17 PROPERTIES |
| msgStoreItemMemory.putShort((short) propertiesLength); |
| if (propertiesLength > 0) { |
| msgStoreItemMemory.put(propertiesData); |
| } |
| return new EncodeResult(AppendMessageStatus.PUT_OK, msgStoreItemMemory, key); |
| } |
| |
| public EncodeResult serialize(final MessageExtBatch messageExtBatch) { |
| keyBuilder.setLength(0); |
| keyBuilder.append(messageExtBatch.getTopic()); |
| keyBuilder.append('-'); |
| keyBuilder.append(messageExtBatch.getQueueId()); |
| String key = keyBuilder.toString(); |
| |
| Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key); |
| if (null == queueOffset) { |
| queueOffset = 0L; |
| DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset); |
| } |
| |
| int totalMsgLen = 0; |
| ByteBuffer messagesByteBuff = messageExtBatch.wrap(); |
| List<byte[]> batchBody = new LinkedList<>(); |
| |
| int sysFlag = messageExtBatch.getSysFlag(); |
| int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; |
| int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; |
| ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength); |
| ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); |
| |
| while (messagesByteBuff.hasRemaining()) { |
| // 1 TOTALSIZE |
| messagesByteBuff.getInt(); |
| // 2 MAGICCODE |
| messagesByteBuff.getInt(); |
| // 3 BODYCRC |
| messagesByteBuff.getInt(); |
| // 4 FLAG |
| int flag = messagesByteBuff.getInt(); |
| // 5 BODY |
| int bodyLen = messagesByteBuff.getInt(); |
| int bodyPos = messagesByteBuff.position(); |
| int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen); |
| messagesByteBuff.position(bodyPos + bodyLen); |
| // 6 properties |
| short propertiesLen = messagesByteBuff.getShort(); |
| int propertiesPos = messagesByteBuff.position(); |
| messagesByteBuff.position(propertiesPos + propertiesLen); |
| |
| final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); |
| |
| final int topicLength = topicData.length; |
| |
| final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen); |
| ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen); |
| |
| // Exceeds the maximum message |
| if (msgLen > this.maxMessageSize) { |
| CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + |
| bodyLen |
| + ", maxMessageSize: " + this.maxMessageSize); |
| throw new RuntimeException("message size exceeded"); |
| } |
| |
| totalMsgLen += msgLen; |
| // Determines whether there is sufficient free space |
| if (totalMsgLen > maxMessageSize) { |
| throw new RuntimeException("message size exceeded"); |
| } |
| |
| // Initialization of storage space |
| this.resetByteBuffer(msgStoreItemMemory, msgLen); |
| // 1 TOTALSIZE |
| msgStoreItemMemory.putInt(msgLen); |
| // 2 MAGICCODE |
| msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE); |
| // 3 BODYCRC |
| msgStoreItemMemory.putInt(bodyCrc); |
| // 4 QUEUEID |
| msgStoreItemMemory.putInt(messageExtBatch.getQueueId()); |
| // 5 FLAG |
| msgStoreItemMemory.putInt(flag); |
| // 6 QUEUEOFFSET |
| msgStoreItemMemory.putLong(queueOffset++); |
| // 7 PHYSICALOFFSET |
| msgStoreItemMemory.putLong(0); |
| // 8 SYSFLAG |
| msgStoreItemMemory.putInt(messageExtBatch.getSysFlag()); |
| // 9 BORNTIMESTAMP |
| msgStoreItemMemory.putLong(messageExtBatch.getBornTimestamp()); |
| // 10 BORNHOST |
| resetByteBuffer(bornHostHolder, bornHostLength); |
| msgStoreItemMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder)); |
| // 11 STORETIMESTAMP |
| msgStoreItemMemory.putLong(messageExtBatch.getStoreTimestamp()); |
| // 12 STOREHOSTADDRESS |
| resetByteBuffer(storeHostHolder, storeHostLength); |
| msgStoreItemMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder)); |
| // 13 RECONSUMETIMES |
| msgStoreItemMemory.putInt(messageExtBatch.getReconsumeTimes()); |
| // 14 Prepared Transaction Offset |
| msgStoreItemMemory.putLong(0); |
| // 15 BODY |
| msgStoreItemMemory.putInt(bodyLen); |
| if (bodyLen > 0) { |
| msgStoreItemMemory.put(messagesByteBuff.array(), bodyPos, bodyLen); |
| } |
| // 16 TOPIC |
| msgStoreItemMemory.put((byte) topicLength); |
| msgStoreItemMemory.put(topicData); |
| // 17 PROPERTIES |
| msgStoreItemMemory.putShort(propertiesLen); |
| if (propertiesLen > 0) { |
| msgStoreItemMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen); |
| } |
| byte[] data = new byte[msgLen]; |
| msgStoreItemMemory.clear(); |
| msgStoreItemMemory.get(data); |
| batchBody.add(data); |
| } |
| |
| return new EncodeResult(AppendMessageStatus.PUT_OK, key, batchBody, totalMsgLen); |
| } |
| |
| private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { |
| byteBuffer.flip(); |
| byteBuffer.limit(limit); |
| } |
| } |
| |
| public static class DLedgerSelectMappedBufferResult extends SelectMappedBufferResult { |
| |
| private SelectMmapBufferResult sbr; |
| |
| public DLedgerSelectMappedBufferResult(SelectMmapBufferResult sbr) { |
| super(sbr.getStartOffset(), sbr.getByteBuffer(), sbr.getSize(), null); |
| this.sbr = sbr; |
| } |
| |
| public synchronized void release() { |
| super.release(); |
| if (sbr != null) { |
| sbr.release(); |
| } |
| } |
| |
| } |
| |
| public DLedgerServer getdLedgerServer() { |
| return dLedgerServer; |
| } |
| |
| public int getId() { |
| return id; |
| } |
| |
| public long getDividedCommitlogOffset() { |
| return dividedCommitlogOffset; |
| } |
| } |