| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.rocketmq.tieredstore.file; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.stream.Collectors; |
| import javax.annotation.Nullable; |
| import org.apache.rocketmq.common.BoundaryType; |
| import org.apache.rocketmq.logging.org.slf4j.Logger; |
| import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; |
| import org.apache.rocketmq.tieredstore.common.AppendResult; |
| import org.apache.rocketmq.tieredstore.common.FileSegmentType; |
| import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode; |
| import org.apache.rocketmq.tieredstore.exception.TieredStoreException; |
| import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata; |
| import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; |
| import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator; |
| import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; |
| import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; |
| |
| public class TieredFlatFile { |
| |
| private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); |
| |
| private final String filePath; |
| private final FileSegmentType fileType; |
| private final TieredMetadataStore tieredMetadataStore; |
| |
| private volatile long baseOffset = -1L; |
| private final FileSegmentAllocator fileSegmentAllocator; |
| private final List<TieredFileSegment> fileSegmentList; |
| private final List<TieredFileSegment> needCommitFileSegmentList; |
| private final ReentrantReadWriteLock fileSegmentLock; |
| |
| public TieredFlatFile(FileSegmentAllocator fileSegmentAllocator, |
| FileSegmentType fileType, String filePath) { |
| |
| this.fileType = fileType; |
| this.filePath = filePath; |
| this.fileSegmentList = new LinkedList<>(); |
| this.fileSegmentLock = new ReentrantReadWriteLock(); |
| this.fileSegmentAllocator = fileSegmentAllocator; |
| this.needCommitFileSegmentList = new CopyOnWriteArrayList<>(); |
| this.tieredMetadataStore = TieredStoreUtil.getMetadataStore(fileSegmentAllocator.getStoreConfig()); |
| this.recoverMetadata(); |
| |
| if (fileType != FileSegmentType.INDEX) { |
| checkAndFixFileSize(); |
| } |
| } |
| |
| public long getBaseOffset() { |
| return baseOffset; |
| } |
| |
| public void setBaseOffset(long baseOffset) { |
| if (fileSegmentList.size() > 0) { |
| throw new IllegalStateException("Can not set base offset after file segment has been created"); |
| } |
| this.baseOffset = baseOffset; |
| } |
| |
| public long getMinOffset() { |
| fileSegmentLock.readLock().lock(); |
| try { |
| if (fileSegmentList.isEmpty()) { |
| return baseOffset; |
| } |
| return fileSegmentList.get(0).getBaseOffset(); |
| } finally { |
| fileSegmentLock.readLock().unlock(); |
| } |
| } |
| |
| public long getCommitOffset() { |
| fileSegmentLock.readLock().lock(); |
| try { |
| if (fileSegmentList.isEmpty()) { |
| return baseOffset; |
| } |
| return fileSegmentList.get(fileSegmentList.size() - 1).getCommitOffset(); |
| } finally { |
| fileSegmentLock.readLock().unlock(); |
| } |
| } |
| |
| public long getMaxOffset() { |
| fileSegmentLock.readLock().lock(); |
| try { |
| if (fileSegmentList.isEmpty()) { |
| return baseOffset; |
| } |
| return fileSegmentList.get(fileSegmentList.size() - 1).getMaxOffset(); |
| } finally { |
| fileSegmentLock.readLock().unlock(); |
| } |
| } |
| |
| public long getDispatchCommitOffset() { |
| fileSegmentLock.readLock().lock(); |
| try { |
| if (fileSegmentList.isEmpty()) { |
| return 0; |
| } |
| return fileSegmentList.get(fileSegmentList.size() - 1).getDispatchCommitOffset(); |
| } finally { |
| fileSegmentLock.readLock().unlock(); |
| } |
| } |
| |
| public String getFilePath() { |
| return filePath; |
| } |
| |
| public FileSegmentType getFileType() { |
| return fileType; |
| } |
| |
| @VisibleForTesting |
| public List<TieredFileSegment> getFileSegmentList() { |
| return fileSegmentList; |
| } |
| |
| protected void recoverMetadata() { |
| fileSegmentList.clear(); |
| needCommitFileSegmentList.clear(); |
| |
| tieredMetadataStore.iterateFileSegment(filePath, fileType, metadata -> { |
| if (metadata.getStatus() == FileSegmentMetadata.STATUS_DELETED) { |
| return; |
| } |
| |
| TieredFileSegment segment = this.newSegment(fileType, metadata.getBaseOffset(), false); |
| segment.initPosition(metadata.getSize()); |
| segment.setMinTimestamp(metadata.getBeginTimestamp()); |
| segment.setMaxTimestamp(metadata.getEndTimestamp()); |
| if (metadata.getStatus() == FileSegmentMetadata.STATUS_SEALED) { |
| segment.setFull(false); |
| } |
| |
| // TODO check coda/size |
| fileSegmentList.add(segment); |
| }); |
| |
| if (!fileSegmentList.isEmpty()) { |
| fileSegmentList.sort(Comparator.comparingLong(TieredFileSegment::getBaseOffset)); |
| baseOffset = fileSegmentList.get(0).getBaseOffset(); |
| needCommitFileSegmentList.addAll( |
| fileSegmentList.stream().filter(segment -> !segment.isFull()).collect(Collectors.toList())); |
| } |
| } |
| |
| /** |
| * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full |
| */ |
| public void updateFileSegment(TieredFileSegment fileSegment) { |
| |
| FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment( |
| this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset()); |
| |
| // Note: file segment path may not the same as file base path, use base path here. |
| if (metadata == null) { |
| metadata = new FileSegmentMetadata( |
| this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType()); |
| metadata.setCreateTimestamp(System.currentTimeMillis()); |
| } |
| |
| metadata.setSize(fileSegment.getCommitPosition()); |
| metadata.setBeginTimestamp(fileSegment.getMinTimestamp()); |
| metadata.setEndTimestamp(fileSegment.getMaxTimestamp()); |
| |
| if (fileSegment.isFull() && !fileSegment.needCommit()) { |
| if (metadata.getStatus() == FileSegmentMetadata.STATUS_NEW) { |
| metadata.markSealed(); |
| } |
| } |
| |
| if (fileSegment.isClosed()) { |
| metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); |
| } |
| |
| this.tieredMetadataStore.updateFileSegment(metadata); |
| } |
| |
| private void checkAndFixFileSize() { |
| for (int i = 1; i < fileSegmentList.size(); i++) { |
| TieredFileSegment pre = fileSegmentList.get(i - 1); |
| TieredFileSegment cur = fileSegmentList.get(i); |
| if (pre.getCommitOffset() != cur.getBaseOffset()) { |
| logger.warn("TieredFlatFile#checkAndFixFileSize: file segment has incorrect size: " + |
| "filePath:{}, file type: {}, base offset: {}", filePath, fileType, pre.getBaseOffset()); |
| try { |
| long actualSize = pre.getSize(); |
| if (pre.getBaseOffset() + actualSize != cur.getBaseOffset()) { |
| logger.error("[Bug]TieredFlatFile#checkAndFixFileSize: " + |
| "file segment has incorrect size and can not fix: " + |
| "filePath:{}, file type: {}, base offset: {}, actual size: {}, next file offset: {}", |
| filePath, fileType, pre.getBaseOffset(), actualSize, cur.getBaseOffset()); |
| continue; |
| } |
| pre.initPosition(actualSize); |
| this.updateFileSegment(pre); |
| } catch (Exception e) { |
| logger.error("TieredFlatFile#checkAndFixFileSize: " + |
| "fix file segment size failed: filePath: {}, file type: {}, base offset: {}", |
| filePath, fileType, pre.getBaseOffset()); |
| } |
| } |
| } |
| |
| if (!fileSegmentList.isEmpty()) { |
| TieredFileSegment lastFile = fileSegmentList.get(fileSegmentList.size() - 1); |
| long lastFileSize = lastFile.getSize(); |
| if (lastFile.getCommitPosition() != lastFileSize) { |
| logger.warn("TieredFlatFile#checkAndFixFileSize: fix last file {} size: origin: {}, actual: {}", |
| lastFile.getPath(), lastFile.getCommitOffset() - lastFile.getBaseOffset(), lastFileSize); |
| lastFile.initPosition(lastFileSize); |
| this.updateFileSegment(lastFile); |
| } |
| } |
| } |
| |
| private TieredFileSegment newSegment(FileSegmentType fileType, long baseOffset, boolean createMetadata) { |
| TieredFileSegment segment = null; |
| try { |
| segment = fileSegmentAllocator.createSegment(fileType, filePath, baseOffset); |
| if (fileType != FileSegmentType.INDEX) { |
| segment.createFile(); |
| } |
| if (createMetadata) { |
| this.updateFileSegment(segment); |
| } |
| } catch (Exception e) { |
| logger.error("create file segment failed: filePath:{}, file type: {}, base offset: {}", |
| filePath, fileType, baseOffset, e); |
| } |
| return segment; |
| } |
| |
| public void rollingNewFile() { |
| TieredFileSegment segment = getFileToWrite(); |
| segment.setFull(); |
| // create new segment |
| getFileToWrite(); |
| } |
| |
| public int getFileSegmentCount() { |
| return fileSegmentList.size(); |
| } |
| |
| @Nullable |
| protected TieredFileSegment getFileByIndex(int index) { |
| fileSegmentLock.readLock().lock(); |
| try { |
| if (index < fileSegmentList.size()) { |
| return fileSegmentList.get(index); |
| } |
| return null; |
| } finally { |
| fileSegmentLock.readLock().unlock(); |
| } |
| } |
| |
| protected TieredFileSegment getFileToWrite() { |
| if (baseOffset == -1) { |
| throw new IllegalStateException("need to set base offset before create file segment"); |
| } |
| fileSegmentLock.readLock().lock(); |
| try { |
| if (!fileSegmentList.isEmpty()) { |
| TieredFileSegment fileSegment = fileSegmentList.get(fileSegmentList.size() - 1); |
| if (!fileSegment.isFull()) { |
| return fileSegment; |
| } |
| } |
| } finally { |
| fileSegmentLock.readLock().unlock(); |
| } |
| // Create new file segment |
| fileSegmentLock.writeLock().lock(); |
| try { |
| long offset = baseOffset; |
| if (!fileSegmentList.isEmpty()) { |
| TieredFileSegment segment = fileSegmentList.get(fileSegmentList.size() - 1); |
| if (!segment.isFull()) { |
| return segment; |
| } |
| if (segment.commit()) { |
| try { |
| this.updateFileSegment(segment); |
| } catch (Exception e) { |
| return segment; |
| } |
| } else { |
| return segment; |
| } |
| |
| offset = segment.getMaxOffset(); |
| } |
| TieredFileSegment fileSegment = this.newSegment(fileType, offset, true); |
| fileSegmentList.add(fileSegment); |
| needCommitFileSegmentList.add(fileSegment); |
| Collections.sort(fileSegmentList); |
| logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}", |
| offset, fileSegment.getPath(), fileType); |
| return fileSegment; |
| } finally { |
| fileSegmentLock.writeLock().unlock(); |
| } |
| } |
| |
| @Nullable |
| protected TieredFileSegment getFileByTime(long timestamp, BoundaryType boundaryType) { |
| fileSegmentLock.readLock().lock(); |
| try { |
| List<TieredFileSegment> segmentList = fileSegmentList.stream() |
| .sorted(boundaryType == BoundaryType.UPPER ? Comparator.comparingLong(TieredFileSegment::getMaxTimestamp) : Comparator.comparingLong(TieredFileSegment::getMinTimestamp)) |
| .filter(segment -> boundaryType == BoundaryType.UPPER ? segment.getMaxTimestamp() >= timestamp : segment.getMinTimestamp() <= timestamp) |
| .collect(Collectors.toList()); |
| if (!segmentList.isEmpty()) { |
| return boundaryType == BoundaryType.UPPER ? segmentList.get(0) : segmentList.get(segmentList.size() - 1); |
| } |
| if (fileSegmentList.isEmpty()) { |
| return null; |
| } |
| return boundaryType == BoundaryType.UPPER ? fileSegmentList.get(fileSegmentList.size() - 1) : fileSegmentList.get(0); |
| } finally { |
| fileSegmentLock.readLock().unlock(); |
| } |
| } |
| |
| protected List<TieredFileSegment> getFileListByTime(long beginTime, long endTime) { |
| fileSegmentLock.readLock().lock(); |
| try { |
| return fileSegmentList.stream() |
| .filter(segment -> Math.max(beginTime, segment.getMinTimestamp()) <= Math.min(endTime, segment.getMaxTimestamp())) |
| .collect(Collectors.toList()); |
| } finally { |
| fileSegmentLock.readLock().unlock(); |
| } |
| } |
| |
| protected int getSegmentIndexByOffset(long offset) { |
| fileSegmentLock.readLock().lock(); |
| try { |
| if (fileSegmentList.size() == 0) { |
| return -1; |
| } |
| |
| int left = 0; |
| int right = fileSegmentList.size() - 1; |
| int mid = (left + right) / 2; |
| |
| long firstSegmentOffset = fileSegmentList.get(left).getBaseOffset(); |
| long lastSegmentOffset = fileSegmentList.get(right).getCommitOffset(); |
| long midSegmentOffset = fileSegmentList.get(mid).getBaseOffset(); |
| |
| if (offset < firstSegmentOffset || offset > lastSegmentOffset) { |
| return -1; |
| } |
| |
| while (left < right - 1) { |
| if (offset == midSegmentOffset) { |
| return mid; |
| } |
| if (offset < midSegmentOffset) { |
| right = mid; |
| } else { |
| left = mid; |
| } |
| mid = (left + right) / 2; |
| midSegmentOffset = fileSegmentList.get(mid).getBaseOffset(); |
| } |
| return offset < fileSegmentList.get(right).getBaseOffset() ? mid : right; |
| } finally { |
| fileSegmentLock.readLock().unlock(); |
| } |
| } |
| |
| public AppendResult append(ByteBuffer byteBuf) { |
| return append(byteBuf, Long.MAX_VALUE, false); |
| } |
| |
| public AppendResult append(ByteBuffer byteBuf, long timeStamp) { |
| return append(byteBuf, timeStamp, false); |
| } |
| |
| public AppendResult append(ByteBuffer byteBuf, long timeStamp, boolean commit) { |
| TieredFileSegment fileSegment = getFileToWrite(); |
| AppendResult result = fileSegment.append(byteBuf, timeStamp); |
| if (commit && result == AppendResult.BUFFER_FULL && fileSegment.commit()) { |
| result = fileSegment.append(byteBuf, timeStamp); |
| } |
| if (result == AppendResult.FILE_FULL) { |
| // write to new file |
| return getFileToWrite().append(byteBuf, timeStamp); |
| } |
| return result; |
| } |
| |
| public int cleanExpiredFile(long expireTimestamp) { |
| Set<Long> needToDeleteSet = new HashSet<>(); |
| try { |
| tieredMetadataStore.iterateFileSegment(filePath, fileType, metadata -> { |
| if (metadata.getEndTimestamp() < expireTimestamp) { |
| needToDeleteSet.add(metadata.getBaseOffset()); |
| } |
| }); |
| } catch (Exception e) { |
| logger.error("Clean expired file, filePath: {}, file type: {}, expire timestamp: {}", |
| filePath, fileType, expireTimestamp); |
| } |
| |
| if (needToDeleteSet.isEmpty()) { |
| return 0; |
| } |
| |
| fileSegmentLock.writeLock().lock(); |
| try { |
| for (int i = 0; i < fileSegmentList.size(); i++) { |
| TieredFileSegment fileSegment = fileSegmentList.get(i); |
| try { |
| if (needToDeleteSet.contains(fileSegment.getBaseOffset())) { |
| fileSegment.close(); |
| fileSegmentList.remove(fileSegment); |
| needCommitFileSegmentList.remove(fileSegment); |
| i--; |
| this.updateFileSegment(fileSegment); |
| logger.debug("Clean expired file, filePath: {}", fileSegment.getPath()); |
| } else { |
| break; |
| } |
| } catch (Exception e) { |
| logger.error("Clean expired file failed: filePath: {}, file type: {}, expire timestamp: {}", |
| fileSegment.getPath(), fileSegment.getFileType(), expireTimestamp, e); |
| } |
| } |
| if (fileSegmentList.size() > 0) { |
| baseOffset = fileSegmentList.get(0).getBaseOffset(); |
| } else if (fileType == FileSegmentType.CONSUME_QUEUE) { |
| baseOffset = -1; |
| } else { |
| baseOffset = 0; |
| } |
| } finally { |
| fileSegmentLock.writeLock().unlock(); |
| } |
| return needToDeleteSet.size(); |
| } |
| |
| @VisibleForTesting |
| protected List<TieredFileSegment> getNeedCommitFileSegmentList() { |
| return needCommitFileSegmentList; |
| } |
| |
| public void destroyExpiredFile() { |
| try { |
| tieredMetadataStore.iterateFileSegment(filePath, fileType, metadata -> { |
| if (metadata.getStatus() == FileSegmentMetadata.STATUS_DELETED) { |
| try { |
| TieredFileSegment fileSegment = |
| this.newSegment(fileType, metadata.getBaseOffset(), false); |
| fileSegment.destroyFile(); |
| if (!fileSegment.exists()) { |
| tieredMetadataStore.deleteFileSegment(filePath, fileType, metadata.getBaseOffset()); |
| } |
| } catch (Exception e) { |
| logger.error("Destroyed expired file failed, file path: {}, file type: {}", |
| filePath, fileType, e); |
| } |
| } |
| }); |
| } catch (Exception e) { |
| logger.error("Destroyed expired file, file path: {}, file type: {}", filePath, fileType); |
| } |
| } |
| |
| public void commit(boolean sync) { |
| ArrayList<CompletableFuture<Void>> futureList = new ArrayList<>(); |
| try { |
| for (TieredFileSegment segment : needCommitFileSegmentList) { |
| if (segment.isClosed()) { |
| continue; |
| } |
| futureList.add(segment |
| .commitAsync() |
| .thenAccept(success -> { |
| try { |
| this.updateFileSegment(segment); |
| } catch (Exception e) { |
| // TODO handle update segment metadata failed exception |
| logger.error("Update file segment metadata failed: " + |
| "file path: {}, file type: {}, base offset: {}", |
| filePath, fileType, segment.getBaseOffset(), e); |
| } |
| if (segment.isFull() && !segment.needCommit()) { |
| needCommitFileSegmentList.remove(segment); |
| } |
| }) |
| ); |
| } |
| } catch (Exception e) { |
| logger.error("Commit file segment failed: topic: {}, queue: {}, file type: {}", filePath, fileType, e); |
| } |
| if (sync) { |
| CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); |
| } |
| } |
| |
| public CompletableFuture<ByteBuffer> readAsync(long offset, int length) { |
| int index = getSegmentIndexByOffset(offset); |
| if (index == -1) { |
| String errorMsg = String.format("TieredFlatFile#readAsync: offset is illegal, " + |
| "file path: %s, file type: %s, start: %d, length: %d, file num: %d", |
| filePath, fileType, offset, length, fileSegmentList.size()); |
| logger.error(errorMsg); |
| throw new TieredStoreException(TieredStoreErrorCode.ILLEGAL_OFFSET, errorMsg); |
| } |
| TieredFileSegment fileSegment1; |
| TieredFileSegment fileSegment2 = null; |
| fileSegmentLock.readLock().lock(); |
| try { |
| fileSegment1 = fileSegmentList.get(index); |
| if (offset + length > fileSegment1.getCommitOffset()) { |
| if (fileSegmentList.size() > index + 1) { |
| fileSegment2 = fileSegmentList.get(index + 1); |
| } |
| } |
| } finally { |
| fileSegmentLock.readLock().unlock(); |
| } |
| if (fileSegment2 == null) { |
| return fileSegment1.readAsync(offset - fileSegment1.getBaseOffset(), length); |
| } |
| int segment1Length = (int) (fileSegment1.getCommitOffset() - offset); |
| return fileSegment1.readAsync(offset - fileSegment1.getBaseOffset(), segment1Length) |
| .thenCombine(fileSegment2.readAsync(0, length - segment1Length), (buffer1, buffer2) -> { |
| ByteBuffer compositeBuffer = ByteBuffer.allocate(buffer1.remaining() + buffer2.remaining()); |
| compositeBuffer.put(buffer1).put(buffer2); |
| compositeBuffer.flip(); |
| return compositeBuffer; |
| }); |
| } |
| |
| public void destroy() { |
| fileSegmentLock.writeLock().lock(); |
| try { |
| for (TieredFileSegment fileSegment : fileSegmentList) { |
| fileSegment.close(); |
| try { |
| this.updateFileSegment(fileSegment); |
| } catch (Exception e) { |
| logger.error("TieredFlatFile#destroy: mark file segment: {} is deleted failed", fileSegment.getPath(), e); |
| } |
| fileSegment.destroyFile(); |
| if (!fileSegment.exists()) { |
| tieredMetadataStore.deleteFileSegment(filePath, fileType, fileSegment.getBaseOffset()); |
| } |
| } |
| fileSegmentList.clear(); |
| needCommitFileSegmentList.clear(); |
| } finally { |
| fileSegmentLock.writeLock().unlock(); |
| } |
| } |
| } |