| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.rocketmq.tieredstore.provider.posix; |
| |
| import com.google.common.base.Stopwatch; |
| import com.google.common.io.ByteStreams; |
| import io.opentelemetry.api.common.Attributes; |
| import io.opentelemetry.api.common.AttributesBuilder; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileChannel; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.rocketmq.common.message.MessageQueue; |
| import org.apache.rocketmq.logging.org.slf4j.Logger; |
| import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; |
| import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; |
| import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; |
| import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager; |
| import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; |
| import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; |
| import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; |
| |
| import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE; |
| import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_OPERATION; |
| import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_SUCCESS; |
| import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_TOPIC; |
| |
| /** |
| * this class is experimental and may change without notice. |
| */ |
| public class PosixFileSegment extends TieredFileSegment { |
| private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); |
| |
| private static final String OPERATION_POSIX_READ = "read"; |
| private static final String OPERATION_POSIX_WRITE = "write"; |
| |
| private final String basePath; |
| private final String filepath; |
| |
| private volatile File file; |
| private volatile FileChannel readFileChannel; |
| private volatile FileChannel writeFileChannel; |
| |
| public PosixFileSegment(FileSegmentType fileType, MessageQueue messageQueue, |
| long baseOffset, TieredMessageStoreConfig storeConfig) { |
| super(fileType, messageQueue, baseOffset, storeConfig); |
| |
| String basePath = storeConfig.getTieredStoreFilepath(); |
| if (StringUtils.isBlank(basePath) || basePath.endsWith(File.separator)) { |
| this.basePath = basePath; |
| } else { |
| this.basePath = basePath + File.separator; |
| } |
| this.filepath = this.basePath |
| + TieredStoreUtil.getHash(storeConfig.getBrokerClusterName()) + "_" + storeConfig.getBrokerClusterName() + File.separator |
| + messageQueue.getBrokerName() + File.separator |
| + messageQueue.getTopic() + File.separator |
| + messageQueue.getQueueId() + File.separator |
| + fileType + File.separator |
| + TieredStoreUtil.offset2FileName(baseOffset); |
| createFile(); |
| } |
| |
| protected AttributesBuilder newAttributesBuilder() { |
| return TieredStoreMetricsManager.newAttributesBuilder() |
| .put(LABEL_TOPIC, messageQueue.getTopic()) |
| .put(LABEL_FILE_TYPE, fileType.name().toLowerCase()); |
| } |
| |
| @Override |
| public String getPath() { |
| return filepath; |
| } |
| |
| @Override |
| public long getSize() { |
| if (exists()) { |
| return file.length(); |
| } |
| return -1; |
| } |
| |
| @Override |
| public boolean exists() { |
| return file != null && file.exists(); |
| } |
| |
| @Override |
| public void createFile() { |
| if (file == null) { |
| synchronized (this) { |
| if (file == null) { |
| File file = new File(filepath); |
| try { |
| File dir = file.getParentFile(); |
| if (!dir.exists()) { |
| dir.mkdirs(); |
| } |
| |
| // TODO use direct IO to avoid polluting the page cache |
| file.createNewFile(); |
| this.readFileChannel = new RandomAccessFile(file, "r").getChannel(); |
| this.writeFileChannel = new RandomAccessFile(file, "rwd").getChannel(); |
| this.file = file; |
| } catch (Exception e) { |
| logger.error("PosixFileSegment#createFile: create file {} failed: ", filepath, e); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void destroyFile() { |
| try { |
| if (readFileChannel != null && readFileChannel.isOpen()) { |
| readFileChannel.close(); |
| } |
| if (writeFileChannel != null && writeFileChannel.isOpen()) { |
| writeFileChannel.close(); |
| } |
| } catch (IOException e) { |
| logger.error("PosixFileSegment#destroyFile: destroy file {} failed: ", filepath, e); |
| } |
| |
| if (file.exists()) { |
| file.delete(); |
| } |
| } |
| |
| @Override |
| public CompletableFuture<ByteBuffer> read0(long position, int length) { |
| Stopwatch stopwatch = Stopwatch.createStarted(); |
| AttributesBuilder attributesBuilder = newAttributesBuilder() |
| .put(LABEL_OPERATION, OPERATION_POSIX_READ); |
| |
| CompletableFuture<ByteBuffer> future = new CompletableFuture<>(); |
| ByteBuffer byteBuffer = ByteBuffer.allocate(length); |
| try { |
| readFileChannel.position(position); |
| readFileChannel.read(byteBuffer); |
| byteBuffer.flip(); |
| |
| attributesBuilder.put(LABEL_SUCCESS, true); |
| long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); |
| TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build()); |
| |
| Attributes metricsAttributes = newAttributesBuilder() |
| .put(LABEL_OPERATION, OPERATION_POSIX_READ) |
| .build(); |
| int downloadedBytes = byteBuffer.remaining(); |
| TieredStoreMetricsManager.downloadBytes.record(downloadedBytes, metricsAttributes); |
| |
| future.complete(byteBuffer); |
| } catch (IOException e) { |
| long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); |
| attributesBuilder.put(LABEL_SUCCESS, false); |
| TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build()); |
| logger.error("PosixFileSegment#read0: read file {} failed: position: {}, length: {}", |
| filepath, position, length, e); |
| future.completeExceptionally(e); |
| } |
| return future; |
| } |
| |
| @Override |
| public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length, |
| boolean append) { |
| Stopwatch stopwatch = Stopwatch.createStarted(); |
| AttributesBuilder attributesBuilder = newAttributesBuilder() |
| .put(LABEL_OPERATION, OPERATION_POSIX_WRITE); |
| |
| CompletableFuture<Boolean> future = new CompletableFuture<>(); |
| try { |
| TieredStoreExecutor.commitExecutor.execute(() -> { |
| try { |
| byte[] byteArray = ByteStreams.toByteArray(inputStream); |
| if (byteArray.length != length) { |
| logger.error("PosixFileSegment#commit0: append file {} failed: real data size: {}, is not equal to length: {}", |
| filepath, byteArray.length, length); |
| future.complete(false); |
| return; |
| } |
| writeFileChannel.position(position); |
| ByteBuffer buffer = ByteBuffer.wrap(byteArray); |
| while (buffer.hasRemaining()) { |
| writeFileChannel.write(buffer); |
| } |
| |
| attributesBuilder.put(LABEL_SUCCESS, true); |
| long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); |
| TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build()); |
| |
| Attributes metricsAttributes = newAttributesBuilder() |
| .put(LABEL_OPERATION, OPERATION_POSIX_WRITE) |
| .build(); |
| TieredStoreMetricsManager.uploadBytes.record(length, metricsAttributes); |
| |
| future.complete(true); |
| } catch (Exception e) { |
| long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); |
| attributesBuilder.put(LABEL_SUCCESS, false); |
| TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build()); |
| |
| logger.error("PosixFileSegment#commit0: append file {} failed: position: {}, length: {}", |
| filepath, position, length, e); |
| future.completeExceptionally(e); |
| } |
| }); |
| } catch (Exception e) { |
| // commit task cannot be executed |
| long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); |
| attributesBuilder.put(LABEL_SUCCESS, false); |
| TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build()); |
| |
| future.completeExceptionally(e); |
| } |
| return future; |
| } |
| } |