enable wal compression remove metrics in mem table flush task, cache hash code in partial path, use gzip to compress wal batch update metrics
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 1380a16..ad180cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -94,6 +94,7 @@ "([" + PATH_SEPARATOR + "])?" + NODE_NAME_MATCHER + "(" + PARTIAL_NODE_MATCHER + ")*"; public static final Pattern NODE_PATTERN = Pattern.compile(NODE_MATCHER); + boolean enableWALCompression = true; /** Whether to enable the mqtt service. */ private boolean enableMQTTService = false; @@ -3829,4 +3830,12 @@ double innerCompactionTaskSelectionDiskRedundancy) { this.innerCompactionTaskSelectionDiskRedundancy = innerCompactionTaskSelectionDiskRedundancy; } + + public boolean isEnableWALCompression() { + return enableWALCompression; + } + + public void setEnableWALCompression(boolean enableWALCompression) { + this.enableWALCompression = enableWALCompression; + } }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 139a137..3b71dc7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -412,6 +412,11 @@ "io_task_queue_size_for_flushing", Integer.toString(conf.getIoTaskQueueSizeForFlushing())))); + conf.setEnableWALCompression( + Boolean.parseBoolean( + properties.getProperty( + "enable_wal_compression", Boolean.toString(conf.isEnableWALCompression())))); + conf.setCompactionScheduleIntervalInMs( Long.parseLong( properties.getProperty(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java index b0d8aad..2bc6352 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -52,6 +52,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; public class InsertRowNode extends InsertNode implements WALEntryValue { @@ -67,6 +68,7 @@ private Object[] values; private boolean isNeedInferType = false; + public AtomicInteger insertCount; public InsertRowNode(PlanNodeId id) { super(id);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java index 12e2294..a2950e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -42,6 +42,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class InsertRowsNode extends InsertNode { @@ -60,6 +62,9 @@ /** the InsertRowsNode list */ private List<InsertRowNode> insertRowNodeList; + public AtomicInteger insertCount = new AtomicInteger(0); + public AtomicLong[] metrics; + public InsertRowsNode(PlanNodeId id) { super(id); insertRowNodeList = new ArrayList<>();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index f671a87..2d573c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance; import org.apache.iotdb.mpp.rpc.thrift.TPlanNode; @@ -60,6 +61,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ; @@ -419,6 +421,13 @@ break; case WRITE: PlanNode planNode = instance.getFragment().getPlanNodeTree(); + if (planNode instanceof InsertRowsNode) { + InsertRowsNode insertRowsNode = (InsertRowsNode) planNode; + insertRowsNode.metrics = new AtomicLong[4]; + for (int i = 0; i < 4; i++) { + insertRowsNode.metrics[i] = new AtomicLong(0); + } + } RegionWriteExecutor writeExecutor = new RegionWriteExecutor(); RegionExecutionResult writeResult = writeExecutor.execute(groupId, planNode); if (!writeResult.isAccepted()) { @@ -438,6 +447,16 @@ } else { // some expected and accepted status except SUCCESS_STATUS need to be returned TSStatus status = writeResult.getStatus(); + if (planNode instanceof InsertRowsNode) { + InsertRowsNode insertRowsNode = (InsertRowsNode) planNode; + PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost( + insertRowsNode.metrics[0].get()); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost( + insertRowsNode.metrics[1].get()); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(insertRowsNode.metrics[2].get()); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost( + insertRowsNode.metrics[3].get()); + } if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new FragmentInstanceDispatchException(status); }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 469eeae..81a35ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.commons.utils.TimePartitionUtils; @@ -104,6 +106,7 @@ import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool; import org.apache.iotdb.db.utils.DateTimeUtils; +import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; @@ -1176,6 +1179,7 @@ if (insertRowNode.allMeasurementFailed()) { continue; } + insertRowNode.insertCount = insertRowsNode.insertCount; TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]); if (tsFileProcessor == null) { @@ -1197,10 +1201,29 @@ } } - PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); - PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); + MetricService.getInstance() + .count( + insertRowsNode.insertCount.get(), + Metric.QUANTITY.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Metric.POINTS_IN.toString(), + Tag.DATABASE.toString(), + databaseName, + Tag.REGION.toString(), + dataRegionId); + + if (insertRowsNode.metrics != null) { + insertRowsNode.metrics[0].addAndGet(costsForMetrics[0]); + insertRowsNode.metrics[1].addAndGet(costsForMetrics[1]); + insertRowsNode.metrics[2].addAndGet(costsForMetrics[2]); + insertRowsNode.metrics[3].addAndGet(costsForMetrics[3]); + } else { + PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); + } if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { if ((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index fe4fe1e..ce8e292 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -156,7 +156,6 @@ series.sortTvListForFlush(); long subTaskTime = System.currentTimeMillis() - startTime; sortTime += subTaskTime; - WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.SORT_TASK, subTaskTime); encodingTaskQueue.put(series); } @@ -258,7 +257,6 @@ Thread.currentThread().interrupt(); } long subTaskTime = System.currentTimeMillis() - starTime; - WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime); memSerializeTime += subTaskTime; } } @@ -344,7 +342,6 @@ } long subTaskTime = System.currentTimeMillis() - starTime; ioTime += subTaskTime; - WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.IO_TASK, subTaskTime); } LOGGER.debug( "flushing a memtable to file {} in database {}, io cost {}ms",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index f8d7f6a..5800434 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -204,18 +204,7 @@ - nullPointsNumber; totalPointsNum += pointsInserted; - - MetricService.getInstance() - .count( - pointsInserted, - Metric.QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId); + insertRowNode.insertCount.addAndGet(pointsInserted); } @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java index 5d2bad0..081b3ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
@@ -27,7 +27,6 @@ import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -48,7 +47,7 @@ private void init() { checkpoints = new ArrayList<>(); try (DataInputStream logStream = - new DataInputStream(new BufferedInputStream(new FileInputStream(logFile)))) { + new DataInputStream(new BufferedInputStream(new WALInputStream(logFile)))) { maxMemTableId = logStream.readLong(); while (logStream.available() > 0) { Checkpoint checkpoint = Checkpoint.deserialize(logStream);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java index 68f4dea..c3fe218 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -19,8 +19,11 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint; +import org.apache.iotdb.tsfile.compress.ICompressor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,18 +47,51 @@ protected final FileOutputStream logStream; protected final FileChannel logChannel; protected long size; + protected boolean isEndFile = false; + private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 2 + 1); + private final ICompressor compressor = ICompressor.getCompressor(CompressionType.GZIP); + private final ByteBuffer compressedByteBuffer; protected LogWriter(File logFile) throws FileNotFoundException { this.logFile = logFile; this.logStream = new FileOutputStream(logFile, true); this.logChannel = this.logStream.getChannel(); + if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()) { + compressedByteBuffer = + ByteBuffer.allocate( + compressor.getMaxBytesForCompression( + IoTDBDescriptor.getInstance().getConfig().getWalBufferSize())); + } else { + compressedByteBuffer = null; + } } @Override public void write(ByteBuffer buffer) throws IOException { + int bufferSize = buffer.position(); size += buffer.position(); buffer.flip(); + boolean compressed = false; + int uncompressedSize = bufferSize; + if (!isEndFile && IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression() + /* && bufferSize > 1024 * 512 Do not compress buffer that is less than 512KB */ ) { + compressedByteBuffer.clear(); + compressor.compress(buffer, compressedByteBuffer); + buffer = compressedByteBuffer; + bufferSize = buffer.position(); + buffer.flip(); + compressed = true; + } + size += bufferSize; + headerBuffer.clear(); + headerBuffer.putInt(bufferSize); + headerBuffer.put((byte) (compressed ? 1 : 0)); try { + if (compressed) { + headerBuffer.putInt(uncompressedSize); + } + headerBuffer.flip(); + logChannel.write(headerBuffer); logChannel.write(buffer); } catch (ClosedChannelException e) { logger.warn("Cannot write to {}", logFile, e);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java index f101eaf..ad3b747 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import java.io.Closeable; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -37,6 +38,7 @@ private final File logFile; private final FileChannel channel; private final WALMetaData metaData; + private final DataInputStream logStream; private final Iterator<Integer> sizeIterator; public WALByteBufReader(File logFile) throws IOException { @@ -46,6 +48,7 @@ public WALByteBufReader(File logFile, FileChannel channel) throws IOException { this.logFile = logFile; this.channel = channel; + this.logStream = new DataInputStream(new WALInputStream(logFile)); this.metaData = WALMetaData.readFromWALFile(logFile, channel); this.sizeIterator = metaData.getBuffersSize().iterator(); channel.position(0); @@ -64,8 +67,8 @@ public ByteBuffer next() throws IOException { int size = sizeIterator.next(); ByteBuffer buffer = ByteBuffer.allocate(size); - channel.read(buffer); - buffer.clear(); + logStream.readFully(buffer.array(), 0, size); + buffer.flip(); return buffer; }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java new file mode 100644 index 0000000..8e742b3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.storageengine.dataregion.wal.io; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.tsfile.compress.IUnCompressor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Objects; + +public class WALInputStream extends InputStream implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(WALInputStream.class); + private final FileChannel channel; + private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES + 1); + private final ByteBuffer compressedHeader = ByteBuffer.allocate(Integer.BYTES); + private ByteBuffer dataBuffer = + ByteBuffer.allocate( + IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()); // uncompressed data buffer + + public WALInputStream(File logFile) throws IOException { + channel = FileChannel.open(logFile.toPath()); + } + + @Override + public int read() throws IOException { + if (Objects.isNull(dataBuffer) || dataBuffer.position() == dataBuffer.limit()) { + loadNextSegment(); + } + return dataBuffer.get() & 0xFF; + } + + @Override + public void close() throws IOException { + channel.close(); + dataBuffer = null; + } + + @Override + public int available() throws IOException { + return (int) (channel.size() - channel.position()); + } + + private void loadNextSegment() throws IOException { + headerBuffer.clear(); + if (channel.read(headerBuffer) != Integer.BYTES + 1) { + throw new IOException("Unexpected end of file"); + } + headerBuffer.flip(); + int dataSize = headerBuffer.getInt(); + boolean isCompressed = headerBuffer.get() == 1; + if (isCompressed) { + compressedHeader.clear(); + if (channel.read(compressedHeader) != Integer.BYTES) { + throw new IOException("Unexpected end of file"); + } + compressedHeader.flip(); + int uncompressedSize = compressedHeader.getInt(); + if (uncompressedSize > dataBuffer.capacity()) { + // enlarge buffer + dataBuffer = ByteBuffer.allocateDirect(uncompressedSize); + } + ByteBuffer compressedData = ByteBuffer.allocateDirect(dataSize); + if (channel.read(compressedData) != dataSize) { + throw new IOException("Unexpected end of file"); + } + compressedData.flip(); + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.LZ4); + dataBuffer.clear(); + unCompressor.uncompress(compressedData, dataBuffer); + } else { + dataBuffer = ByteBuffer.allocateDirect(dataSize); + if (channel.read(dataBuffer) != dataSize) { + throw new IOException("Unexpected end of file"); + } + } + dataBuffer.flip(); + } +}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java index ee50c73..475ea2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
@@ -26,12 +26,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; import java.io.Closeable; import java.io.DataInputStream; import java.io.File; import java.io.IOException; -import java.nio.file.Files; import java.util.Iterator; import java.util.NoSuchElementException; @@ -57,9 +55,7 @@ public WALReader(File logFile, boolean fileMayCorrupt) throws IOException { this.logFile = logFile; this.fileMayCorrupt = fileMayCorrupt; - this.logStream = - new DataInputStream( - new BufferedInputStream(Files.newInputStream(logFile.toPath()), STREAM_BUFFER_SIZE)); + this.logStream = new DataInputStream(new WALInputStream(logFile)); } /** Like {@link Iterator#hasNext()}. */
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java index 425fc67..20ae997 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
@@ -59,6 +59,7 @@ } private void endFile() throws IOException { + this.isEndFile = true; WALSignalEntry endMarker = new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER); int metaDataSize = metaData.serializedSize(); ByteBuffer buffer = @@ -72,6 +73,7 @@ // add magic string buffer.put(MAGIC_STRING.getBytes()); write(buffer); + this.isEndFile = false; } @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java index d5702e7..633a815 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
@@ -126,10 +126,12 @@ return write(NO_BYTE_TO_READ, buffer); } int len = 0; - byte[] bytes = s.getBytes(); - len += write(bytes.length, buffer); - buffer.put(bytes); - len += bytes.length; + len += write(s.length(), buffer); + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + buffer.put((byte) c); // ascii only + } + len += s.length(); return len; }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java index 217fd3e..60845c7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -58,6 +58,8 @@ private static final PartialPath ALL_MATCH_PATTERN = new PartialPath(new String[] {"root", "**"}); protected String[] nodes; + protected int hashCache; + protected boolean cacheHashCache = false; public PartialPath() {} @@ -713,11 +715,17 @@ @Override public int hashCode() { - int h = 0; - for (String node : nodes) { - h += 31 * h + node.hashCode(); + if (cacheHashCache) { + return hashCache; + } else { + int h = 0; + for (String node : nodes) { + h += 31 * h + node.hashCode(); + } + hashCache = h; + cacheHashCache = true; + return h; } - return h; } @Override