fix bugs and smells in the sonarcloud
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
index d041114..d9b5584 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -50,18 +51,18 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-/** This class is used to manage and allocate wal nodes */
+/** This class is used to manage and allocate wal nodes. */
public class WALManager implements IService {
private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- /** manage all wal nodes and decide how to allocate them */
+ // manage all wal nodes and decide how to allocate them
private final NodeAllocationStrategy walNodesManager;
- /** single thread to delete old .wal files */
+ // single thread to delete old .wal files
private ScheduledExecutorService walDeleteThread;
- /** total disk usage of wal files */
+ // total disk usage of wal files
private final AtomicLong totalDiskUsage = new AtomicLong();
- /** total number of wal files */
+ // total number of wal files
private final AtomicLong totalFileNum = new AtomicLong();
private WALManager() {
@@ -83,7 +84,7 @@
+ (sequence ? "sequence" : "unsequence");
}
- /** Apply for a wal node */
+ /** Apply for a wal node. */
public IWALNode applyForWALNode(String applicantUniqueId) {
if (config.getWalMode() == WALMode.DISABLE) {
return WALFakeNode.getSuccessInstance();
@@ -92,7 +93,7 @@
return walNodesManager.applyForWALNode(applicantUniqueId);
}
- /** WAL node will be registered only when using iot consensus protocol */
+ /** WAL node will be registered only when using iot consensus protocol. */
public void registerWALNode(
String applicantUniqueId, String logDirectory, long startFileVersion, long startSearchIndex) {
if (config.getWalMode() == WALMode.DISABLE
@@ -106,7 +107,7 @@
WritingMetrics.getInstance().createWALNodeInfoMetrics(applicantUniqueId);
}
- /** WAL node will be deleted only when using iot consensus protocol */
+ /** WAL node will be deleted only when using iot consensus protocol. */
public void deleteWALNode(String applicantUniqueId) {
if (config.getWalMode() == WALMode.DISABLE
|| !config.isClusterMode()
@@ -132,7 +133,7 @@
}
}
- /** reboot wal delete thread to hot modify delete wal period */
+ /** Reboot wal delete thread to hot modify delete wal period. */
public void rebootWALDeleteThread() {
if (config.getWalMode() == WALMode.DISABLE) {
return;
@@ -149,7 +150,7 @@
config.getDeleteWalFilesPeriodInMs());
}
- /** submit delete outdated wal files task and wait for result */
+ /** Submit delete outdated wal files task and wait for result. */
public void deleteOutdatedWALFiles() {
if (config.getWalMode() == WALMode.DISABLE) {
return;
@@ -184,7 +185,7 @@
}
}
- /** Wait until all write-ahead logs are flushed */
+ /** Wait until all write-ahead logs are flushed. */
public void waitAllWALFlushed() {
if (config.getWalMode() == WALMode.DISABLE) {
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
index 857b9a6..d2a01cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.allocation;
import org.apache.iotdb.commons.conf.CommonConfig;
@@ -39,7 +40,7 @@
LoggerFactory.getLogger(AbstractNodeAllocationStrategy.class);
private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
- /** manage wal folders */
+ // manage wal folders
protected FolderManager folderManager;
protected AbstractNodeAllocationStrategy() {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/ElasticStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/ElasticStrategy.java
index ab7b18b..c66a4c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/allocation/ElasticStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/ElasticStrategy.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.allocation;
import org.apache.iotdb.db.wal.node.IWALNode;
@@ -33,17 +34,17 @@
* number of memTables.
*/
public class ElasticStrategy extends AbstractNodeAllocationStrategy {
- /** each wal node manages fixed number of memTables */
+ // each wal node manages fixed number of memTables
public static final int APPLICATION_NODE_RATIO = 4;
- /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+ // protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter
private final Lock nodesLock = new ReentrantLock();
// region these variables should be protected by nodesLock
- /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
+ // wal nodes, the max number of wal nodes is MAX_WAL_NUM
private final List<WALNode> walNodes;
- /** help allocate node for users */
+ // help allocate node for users
private final Map<String, WALNode> uniqueId2Nodes = new HashMap<>();
- /** each wal node has a unique long value identifier */
+ // each wal node has a unique long value identifier
private int nodeIdCounter = -1;
// endregion
@@ -86,7 +87,6 @@
return snapshot;
}
- /** non-thread-safe, used for metrics only */
@Override
public int getNodesNum() {
return walNodes.size();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
index 0c6c341..55a21fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.allocation;
import org.apache.iotdb.commons.utils.FileUtils;
@@ -36,10 +37,10 @@
* (like data region) has its own wal node.
*/
public class FirstCreateStrategy extends AbstractNodeAllocationStrategy {
- /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+ // protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter
private final Lock nodesLock = new ReentrantLock();
// region these variables should be protected by nodesLock
- /** wal nodes */
+ // wal nodes
private final Map<String, WALNode> identifier2Nodes = new HashMap<>();
// endregion
@@ -47,17 +48,17 @@
public IWALNode applyForWALNode(String applicantUniqueId) {
nodesLock.lock();
try {
- if (identifier2Nodes.containsKey(applicantUniqueId)) {
- return identifier2Nodes.get(applicantUniqueId);
+ if (!identifier2Nodes.containsKey(applicantUniqueId)) {
+ IWALNode walNode = createWALNode(applicantUniqueId);
+ if (walNode instanceof WALNode) {
+ // avoid deletion
+ walNode.setSafelyDeletedSearchIndex(
+ ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX);
+ identifier2Nodes.put(applicantUniqueId, (WALNode) walNode);
+ }
}
- IWALNode walNode = createWALNode(applicantUniqueId);
- if (walNode instanceof WALNode) {
- // avoid deletion
- walNode.setSafelyDeletedSearchIndex(ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX);
- identifier2Nodes.put(applicantUniqueId, (WALNode) walNode);
- }
- return walNode;
+ return identifier2Nodes.get(applicantUniqueId);
} finally {
nodesLock.unlock();
}
@@ -112,7 +113,6 @@
return snapshot;
}
- /** non-thread-safe, used for metrics only */
@Override
public int getNodesNum() {
return identifier2Nodes.size();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/NodeAllocationStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/NodeAllocationStrategy.java
index 9e7f93d..de44e9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/allocation/NodeAllocationStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/NodeAllocationStrategy.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.allocation;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -26,12 +27,11 @@
/** This interface */
public interface NodeAllocationStrategy {
- /** Allocate one wal node for the applicant */
+ /** Allocate one wal node for the applicant. */
IWALNode applyForWALNode(String applicantUniqueId);
- /** Get all wal nodes */
+ /** Get all wal nodes. */
List<WALNode> getNodesSnapshot();
-
- /** Get all wal nodes num */
+ /** Get all wal nodes num. Not thread-safe, used for metrics only. */
int getNodesNum();
@TestOnly
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategy.java
index ffe91e2..0550001 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategy.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.allocation;
import org.apache.iotdb.db.wal.node.IWALNode;
@@ -31,16 +32,16 @@
* several identifiers (like data regions) can share one wal node.
*/
public class RoundRobinStrategy extends AbstractNodeAllocationStrategy {
- /** max wal nodes number */
+ // max wal nodes number
private final int maxWalNodeNum;
- /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+ // protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter
private final Lock nodesLock = new ReentrantLock();
// region these variables should be protected by nodesLock
- /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
+ // wal nodes, the max number of wal nodes is MAX_WAL_NUM
private final List<WALNode> walNodes;
- /** help allocate node for users */
+ // help allocate node for users
private int nodeCursor = -1;
- /** each wal node has a unique int value identifier */
+ // each wal node has a unique int value identifier
private int nodeIdCounter = -1;
// endregion
@@ -89,7 +90,6 @@
return snapshot;
}
- /** non-thread-safe, used for metrics only */
@Override
public int getNodesNum() {
return walNodes.size();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
index 372692a..f93cad1 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.buffer;
import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -35,19 +36,21 @@
public abstract class AbstractWALBuffer implements IWALBuffer {
private static final Logger logger = LoggerFactory.getLogger(AbstractWALBuffer.class);
- /** WALNode identifier of this buffer */
+ // WALNode identifier of this buffer
protected final String identifier;
- /** directory to store .wal files */
+ // directory to store .wal files
protected final String logDirectory;
- /** disk usage of this node‘s wal files */
+ // disk usage of this node‘s wal files
protected long diskUsage = 0;
- /** number of this node‘s wal files */
+ // number of this node‘s wal files
protected long fileNum = 0;
- /** current wal file version id */
+ // current wal file version id
protected volatile long currentWALFileVersion;
- /** current search index */
+ // current search index
protected volatile long currentSearchIndex;
- /** current wal file log writer */
+ // current wal file log writer
+ // it's safe to use volatile here to make this reference thread-safe.
+ @SuppressWarnings("squid:S3077")
protected volatile WALWriter currentWALFileWriter;
protected AbstractWALBuffer(
@@ -87,14 +90,15 @@
* Notice: only called by syncBufferThread and old log writer will be closed by this function.
*
* @return last wal file
+ * @throws IOException If failing to close or open the log writer
*/
protected File rollLogWriter(long searchIndex, WALFileStatus fileStatus) throws IOException {
// close file
File lastFile = currentWALFileWriter.getLogFile();
- String lastName = lastFile.getName();
currentWALFileWriter.close();
addDiskUsage(currentWALFileWriter.size());
addFileNum(1);
+ String lastName = lastFile.getName();
if (WALFileUtils.parseStatusCode(lastName) != fileStatus) {
String targetName =
WALFileUtils.getLogFileName(
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
index f310ee5..b833e06 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.buffer;
import java.util.concurrent.TimeUnit;
@@ -32,24 +33,32 @@
*/
void write(WALEntry walEntry);
- /** Get current log version id */
+ /** Get current log version id. */
long getCurrentWALFileVersion();
- /** Get current wal file's size */
+ /** Get current wal file's size. */
long getCurrentWALFileSize();
- /** Get current search index */
+ /** Get current search index. */
long getCurrentSearchIndex();
@Override
void close();
- /** Wait for next flush operation done */
+ /**
+ * Wait for next flush operation done.
+ *
+ * @throws InterruptedException when interrupted by the flush thread
+ */
void waitForFlush() throws InterruptedException;
- /** Wait for next flush operation done */
+ /**
+ * Wait for next flush operation done.
+ *
+ * @throws InterruptedException when interrupted by the flush thread
+ */
boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException;
- /** Return true when all wal entries all consumed and flushed */
+ /** Return true when all wal entries all consumed and flushed. */
boolean isAllWALEntriesConsumed();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALByteBufferView.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALByteBufferView.java
index f78355b..e1590fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALByteBufferView.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALByteBufferView.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.buffer;
import java.nio.ByteBuffer;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index 9aff438..e1efbaf 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.buffer;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -64,28 +65,34 @@
private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity();
private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();
- /** whether close method is called */
+ // whether close method is called
private volatile boolean isClosed = false;
- /** WALEntries */
+ // WALEntries
private final BlockingQueue<WALEntry> walEntries = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
- /** lock to provide synchronization for double buffers mechanism, protecting buffers status */
+ // lock to provide synchronization for double buffers mechanism, protecting buffers status
private final Lock buffersLock = new ReentrantLock();
- /** condition to guarantee correctness of switching buffers */
+ // condition to guarantee correctness of switching buffers
private final Condition idleBufferReadyCondition = buffersLock.newCondition();
// region these variables should be protected by buffersLock
- /** two buffers switch between three statuses (there is always 1 buffer working) */
+ /** two buffers switch between three statuses (there is always 1 buffer working). */
// buffer in working status, only updated by serializeThread
+ // it's safe to use volatile here to make this reference thread-safe.
+ @SuppressWarnings("squid:S3077")
private volatile ByteBuffer workingBuffer;
// buffer in idle status
+ // it's safe to use volatile here to make this reference thread-safe.
+ @SuppressWarnings("squid:S3077")
private volatile ByteBuffer idleBuffer;
// buffer in syncing status, serializeThread makes sure no more writes to syncingBuffer
+ // it's safe to use volatile here to make this reference thread-safe.
+ @SuppressWarnings("squid:S3077")
private volatile ByteBuffer syncingBuffer;
// endregion
- /** file status of working buffer, updating file writer's status when syncing */
+ // file status of working buffer, updating file writer's status when syncing
protected volatile WALFileStatus currentFileStatus;
- /** single thread to serialize WALEntry to workingBuffer */
+ // single thread to serialize WALEntry to workingBuffer
private final ExecutorService serializeThread;
- /** single thread to sync syncingBuffer to disk */
+ // single thread to sync syncingBuffer to disk
private final ExecutorService syncBufferThread;
public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
@@ -137,7 +144,7 @@
}
// region Task of serializeThread
- /** This info class traverses some extra info from serializeThread to syncBufferThread */
+ /** This info class traverses some extra info from serializeThread to syncBufferThread. */
private static class SerializeInfo {
final WALMetaData metaData = new WALMetaData();
final List<WALFlushListener> fsyncListeners = new ArrayList<>();
@@ -161,7 +168,7 @@
}
}
- /** In order to control memory usage of blocking queue, get 1 and then serialize 1 */
+ // In order to control memory usage of blocking queue, get 1 and then serialize 1
private void serialize() {
// try to get first WALEntry with blocking interface
long start = System.nanoTime();
@@ -214,6 +221,8 @@
}
/**
+ * Handle wal info and signal entry.
+ *
* @return true if fsyncWorkingBuffer has been called, which means this serialization task
* should be ended.
*/
@@ -226,7 +235,7 @@
return false;
}
- /** Handle a normal WALEntry. */
+ /** Handle a normal info WALEntry. */
private void handleInfoEntry(WALEntry walEntry) {
int size = byteBufferView.position();
try {
@@ -261,6 +270,8 @@
}
/**
+ * Handle a signal entry.
+ *
* @return true if fsyncWorkingBuffer has been called, which means this serialization task
* should be ended.
*/
@@ -433,7 +444,7 @@
@Override
public void run() {
- long start = System.nanoTime();
+ final long startTime = System.nanoTime();
long walFileVersionId = currentWALFileVersion;
long position = currentWALFileWriter.size();
currentWALFileWriter.updateFileStatus(fileStatus);
@@ -502,7 +513,7 @@
}
}
WRITING_METRICS.recordWALBufferEntriesCount(info.fsyncListeners.size());
- WRITING_METRICS.recordSyncWALBufferCost(System.nanoTime() - start, forceFlag);
+ WRITING_METRICS.recordSyncWALBufferCost(System.nanoTime() - startTime, forceFlag);
}
}
@@ -550,6 +561,7 @@
walEntries.put(new WALSignalEntry(WALEntryType.CLOSE_SIGNAL));
} catch (InterruptedException e) {
logger.error("Fail to put CLOSE_SIGNAL to walEntries.", e);
+ Thread.currentThread().interrupt();
}
isClosed = true;
shutdownThread(serializeThread, ThreadName.WAL_SERIALIZE);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
index 04781a8..ce4ebfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.buffer;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -44,16 +44,14 @@
public abstract class WALEntry implements SerializedSize {
private static final Logger logger = LoggerFactory.getLogger(WALEntry.class);
- /** type of value */
+ // type of value
protected final WALEntryType type;
- /** memTable id */
+ // memTable id
protected final long memTableId;
- /** value(physical plan or memTable snapshot) */
+ // value(physical plan or memTable snapshot)
protected final WALEntryValue value;
- /**
- * listen whether this WALEntry has been written to the filesystem, null iff this WALEntry is
- * deserialized from .wal file
- */
+ // listen whether this WALEntry has been written to the filesystem
+ // null iff this WALEntry is deserialized from .wal file
protected final WALFlushListener walFlushListener;
protected WALEntry(long memTableId, WALEntryValue value, boolean wait) {
@@ -82,8 +80,7 @@
public abstract void serialize(IWALByteBufferView buffer);
- public static WALEntry deserialize(DataInputStream stream)
- throws IllegalPathException, IOException {
+ public static WALEntry deserialize(DataInputStream stream) throws IOException {
byte typeNum = stream.readByte();
WALEntryType type = WALEntryType.valueOf(typeNum);
@@ -121,7 +118,7 @@
/**
* This deserialization method is only for iot consensus and just deserializes InsertRowNode and
- * InsertTabletNode
+ * InsertTabletNode.
*/
public static PlanNode deserializeForConsensus(ByteBuffer buffer) {
logger.debug(
@@ -131,12 +128,17 @@
buffer.position());
// wal entry type
buffer.get();
- // memtable id
+ // memTable id
buffer.getLong();
return PlanNodeType.deserializeFromWAL(buffer);
}
@Override
+ public int hashCode() {
+ return Objects.hash(type, memTableId, value);
+ }
+
+ @Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java
index 9721484..66a6e6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.buffer;
-/** Type of {@link WALEntry}, including info type and signal type */
+/** Type of {@link WALEntry}, including info type and signal type. */
public enum WALEntryType {
// region info entry type
@Deprecated
@@ -27,6 +28,7 @@
INSERT_TABLET_PLAN((byte) 1),
@Deprecated
DELETE_PLAN((byte) 2),
+ /** {@link org.apache.iotdb.db.engine.memtable.AbstractMemTable} */
MEMORY_TABLE_SNAPSHOT((byte) 3),
/** {@link org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode} */
INSERT_ROW_NODE((byte) 4),
@@ -36,14 +38,13 @@
DELETE_DATA_NODE((byte) 6),
// endregion
// region signal entry type
- /** signal wal buffer has been closed */
+ // signal wal buffer has been closed
CLOSE_SIGNAL(Byte.MIN_VALUE),
- /** signal wal buffer to roll wal log writer */
+ // signal wal buffer to roll wal log writer
ROLL_WAL_LOG_WRITER_SIGNAL((byte) (Byte.MIN_VALUE + 1)),
- /** mark the wal file info part ends */
- WAL_FILE_INFO_END_MARKER((byte) (Byte.MIN_VALUE + 2)),
-// endregion
-;
+ // mark the wal file info part ends
+ WAL_FILE_INFO_END_MARKER((byte) (Byte.MIN_VALUE + 2));
+ // endregion
private final byte code;
@@ -55,7 +56,7 @@
return code;
}
- /** Returns true when this type should be searched */
+ /** Returns true when this type should be searched. */
public boolean needSearch() {
return this == INSERT_TABLET_NODE || this == INSERT_ROW_NODE || this == DELETE_DATA_NODE;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryValue.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryValue.java
index df9a24c..191e5f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryValue.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryValue.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.buffer;
import org.apache.iotdb.db.utils.SerializedSize;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALInfoEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALInfoEntry.java
index 6b8ebb4..d77935d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALInfoEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALInfoEntry.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.buffer;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -23,14 +24,15 @@
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.wal.utils.WALMode;
-/** This entry class stores info for persistence */
+import java.util.Objects;
+
+/** This entry class stores info for persistence. */
public class WALInfoEntry extends WALEntry {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
- /** wal entry type 1 byte, memTable id 8 bytes */
+ // wal entry type 1 byte, memTable id 8 bytes
public static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + Long.BYTES;
- /** extra info for InsertTablet type value */
+ // extra info for InsertTablet type value
private TabletInfo tabletInfo;
public WALInfoEntry(long memTableId, WALEntryValue value, boolean wait) {
@@ -78,9 +80,9 @@
}
private static class TabletInfo {
- /** start row of insert tablet */
+ // start row of insert tablet
private final int tabletStart;
- /** end row of insert tablet */
+ // end row of insert tablet
private final int tabletEnd;
public TabletInfo(int tabletStart, int tabletEnd) {
@@ -93,4 +95,19 @@
public boolean isSignal() {
return false;
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), tabletInfo.tabletStart, tabletInfo.tabletEnd);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!super.equals(obj)) {
+ return false;
+ }
+ WALInfoEntry other = (WALInfoEntry) obj;
+ return this.tabletInfo.tabletStart == other.tabletInfo.tabletStart
+ && this.tabletInfo.tabletEnd == other.tabletInfo.tabletEnd;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALSignalEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALSignalEntry.java
index 854c269..0fc2e54 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALSignalEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALSignalEntry.java
@@ -16,11 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.buffer;
import java.nio.ByteBuffer;
-/** This entry class provides a signal to help wal buffer dealing with some special cases */
+/** This entry class provides a signal to help wal buffer dealing with some special cases. */
public class WALSignalEntry extends WALEntry {
public WALSignalEntry(WALEntryType signalType) {
this(signalType, false);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/Checkpoint.java b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/Checkpoint.java
index b21f4e9..37eb559 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/Checkpoint.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/Checkpoint.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.checkpoint;
import org.apache.iotdb.db.utils.SerializedSize;
@@ -32,12 +33,12 @@
* brief information of each memTable.
*/
public class Checkpoint implements SerializedSize {
- /** checkpoint type 1 byte, checkpoint number 4 bytes */
+ // checkpoint type 1 byte, checkpoint number 4 bytes
private static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + Integer.BYTES;
- /** checkpoint type */
+ // checkpoint type
private final CheckpointType type;
- /** memTable information */
+ // memTable information
private final List<MemTableInfo> memTableInfos;
public Checkpoint(CheckpointType type, List<MemTableInfo> memTableInfos) {
@@ -78,6 +79,11 @@
}
@Override
+ public int hashCode() {
+ return Objects.hash(type, memTableInfos);
+ }
+
+ @Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
index 73e96d6..3eab8e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.checkpoint;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -44,31 +45,31 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-/** This class is used to manage checkpoints of one wal node */
+/** This class is used to manage checkpoints of one wal node. */
public class CheckpointManager implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(CheckpointManager.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();
- /** WALNode identifier of this checkpoint manager */
+ // WALNode identifier of this checkpoint manager
protected final String identifier;
- /** directory to store .checkpoint file */
+ // directory to store .checkpoint file
protected final String logDirectory;
- /**
- * protect concurrent safety of checkpoint info, including memTableId2Info, cachedByteBuffer,
- * currentLogVersion and currentLogWriter
- */
+ // protect concurrent safety of checkpoint info
+ // including memTableId2Info, cachedByteBuffer, currentLogVersion and currentLogWriter
private final Lock infoLock = new ReentrantLock();
// region these variables should be protected by infoLock
- /** memTable id -> memTable info */
+ // memTable id -> memTable info
private final Map<Long, MemTableInfo> memTableId2Info = new HashMap<>();
- /** cache the biggest byte buffer to serialize checkpoint */
+ // cache the biggest byte buffer to serialize checkpoint
+ // it's safe to use volatile here to make this reference thread-safe.
+ @SuppressWarnings("squid:S3077")
private volatile ByteBuffer cachedByteBuffer;
- /** max memTable id */
+ // max memTable id
private long maxMemTableId = 0;
- /** current checkpoint file version id, only updated by fsyncAndDeleteThread */
- private int currentCheckPointFileVersion = 0;
- /** current checkpoint file log writer, only updated by fsyncAndDeleteThread */
+ // current checkpoint file version id, only updated by fsyncAndDeleteThread
+ private long currentCheckPointFileVersion = 0;
+ // current checkpoint file log writer, only updated by fsyncAndDeleteThread
private ILogWriter currentLogWriter;
// endregion
@@ -114,8 +115,8 @@
}
/**
- * make checkpoint for global memTables' info, this checkpoint only exists in the beginning of
- * each checkpoint file
+ * Make checkpoint for global memTables info, this checkpoint only exists in the beginning of each
+ * checkpoint file.
*/
private void makeGlobalInfoCP() {
long start = System.nanoTime();
@@ -126,7 +127,7 @@
WRITING_METRICS.recordMakeCheckpointCost(checkpoint.getType(), System.nanoTime() - start);
}
- /** make checkpoint for create memTable info */
+ /** Make checkpoint for create memTable info. */
public void makeCreateMemTableCP(MemTableInfo memTableInfo) {
infoLock.lock();
long start = System.nanoTime();
@@ -144,7 +145,7 @@
}
}
- /** make checkpoint for flush memTable info */
+ /** Make checkpoint for flush memTable info. */
public void makeFlushMemTableCP(long memTableId) {
infoLock.lock();
long start = System.nanoTime();
@@ -188,7 +189,7 @@
}
// region Task to fsync checkpoint file
- /** Fsync checkpoints to the disk */
+ /** Fsync checkpoints to the disk. */
private void fsyncCheckpointFile() {
infoLock.lock();
try {
@@ -211,7 +212,9 @@
SystemFileFactory.INSTANCE.getFile(
logDirectory,
CheckpointFileUtils.getLogFileName(currentCheckPointFileVersion - 1));
- oldFile.delete();
+ if (!oldFile.delete()) {
+ logger.info("Fail to delete last checkpoint file {}", oldFile);
+ }
}
} catch (IOException e) {
logger.error(
@@ -298,7 +301,7 @@
}
// endregion
- /** Get MemTableInfo of oldest MemTable, whose first version id is smallest */
+ /** Get MemTableInfo of oldest MemTable, whose first version id is smallest. */
public MemTableInfo getOldestMemTableInfo() {
// find oldest memTable
List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
@@ -328,7 +331,7 @@
return firstValidVersionId;
}
- /** Get total cost of active memTables */
+ /** Get total cost of active memTables. */
public long getTotalCostOfActiveMemTables() {
List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
long totalCost = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointType.java b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointType.java
index bbd0279..7c302cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointType.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointType.java
@@ -16,15 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.checkpoint;
-/** Type of {@link Checkpoint} */
+/** Type of {@link Checkpoint}. */
public enum CheckpointType {
- /** record all existing memtables' info */
+ // record all existing memTables info
GLOBAL_MEMORY_TABLE_INFO((byte) 0, "global memory table info"),
- /** record create info of one memtable */
+ // record create info of one memTable
CREATE_MEMORY_TABLE((byte) 1, "create memory table"),
- /** record flush info of one memtable */
+ // record flush info of one memTable
FLUSH_MEMORY_TABLE((byte) 2, "flush memory table");
private final byte code;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java
index bcf1ebe..46f8492 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.checkpoint;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -33,20 +34,20 @@
* file version id of its first {@link WALEntry}.
*/
public class MemTableInfo implements SerializedSize {
- /** memTable id 8 bytes, first version id 8 bytes */
+ // memTable id 8 bytes, first version id 8 bytes
private static final int FIXED_SERIALIZED_SIZE = Long.BYTES * 2;
- /** memTable */
+ // memTable
private IMemTable memTable;
- /** memTable pin count */
+ // memTable pin count
private int pinCount;
- /** memTable is flushed or not */
+ // memTable is flushed or not
private boolean flushed;
- /** memTable id */
+ // memTable id
private long memTableId;
- /** path of the tsFile which this memTable will be flushed to */
+ // path of the tsFile which this memTable will be flushed to
private String tsFilePath;
- /** version id of the file where this memTable's first WALEntry is located */
+ // version id of the file where this memTable's first WALEntry is located
private volatile long firstFileVersionId;
private MemTableInfo() {}
@@ -78,6 +79,11 @@
}
@Override
+ public int hashCode() {
+ return Objects.hash(memTableId, tsFilePath, firstFileVersionId);
+ }
+
+ @Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
@@ -115,7 +121,7 @@
}
public void setFlushed() {
- // avoid memory leak;
+ // avoid memory leak
this.memTable = null;
this.flushed = true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/exception/MemTablePinException.java b/server/src/main/java/org/apache/iotdb/db/wal/exception/MemTablePinException.java
index 3e9722d..c19a8ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/exception/MemTablePinException.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/exception/MemTablePinException.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.exception;
public class MemTablePinException extends WALException {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/exception/WALException.java b/server/src/main/java/org/apache/iotdb/db/wal/exception/WALException.java
index ac0c666..f881ba3 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/exception/WALException.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/exception/WALException.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.exception;
import org.apache.iotdb.commons.exception.IoTDBException;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/exception/WALNodeClosedException.java b/server/src/main/java/org/apache/iotdb/db/wal/exception/WALNodeClosedException.java
index bd9a432..5b851d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/exception/WALNodeClosedException.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/exception/WALNodeClosedException.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.exception;
public class WALNodeClosedException extends WALException {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/exception/WALPipeException.java b/server/src/main/java/org/apache/iotdb/db/wal/exception/WALPipeException.java
index e596ca3..8c1de9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/exception/WALPipeException.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/exception/WALPipeException.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.exception;
public class WALPipeException extends WALException {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/exception/WALRecoverException.java b/server/src/main/java/org/apache/iotdb/db/wal/exception/WALRecoverException.java
index ce83631..844f335 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/exception/WALRecoverException.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/exception/WALRecoverException.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.exception;
public class WALRecoverException extends WALException {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointReader.java b/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointReader.java
index 94ad744..2b0819e 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointReader.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.io;
import org.apache.iotdb.db.wal.checkpoint.Checkpoint;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
index e07a107..f6bcc67 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.io;
import org.apache.iotdb.db.wal.checkpoint.Checkpoint;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/ILogWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/io/ILogWriter.java
index ee34698..4c51c8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/ILogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/ILogWriter.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.io;
import java.io.Closeable;
@@ -60,7 +61,7 @@
long size();
/**
- * Gets the log file
+ * Gets the log file.
*
* @return log file
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/LogWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/io/LogWriter.java
index 416aadc..12fb871 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/LogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/LogWriter.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.io;
import org.apache.iotdb.db.wal.buffer.WALEntry;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
index 1845bd1..3d6cd88 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.io;
import org.apache.iotdb.db.wal.buffer.WALEntry;
@@ -67,12 +68,16 @@
channel.position(0);
}
- /** Like {@link Iterator#hasNext()} */
+ /** Like {@link Iterator#hasNext()}. */
public boolean hasNext() {
return sizeIterator.hasNext();
}
- /** Like {@link Iterator#next()} */
+ /**
+ * Like {@link Iterator#next()}.
+ *
+ * @throws IOException when failing to read from channel.
+ */
public ByteBuffer next() throws IOException {
int size = sizeIterator.next();
ByteBuffer buffer = ByteBuffer.allocate(size);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALMetaData.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALMetaData.java
index 35bbd5e..21eb79d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALMetaData.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALMetaData.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.io;
import org.apache.iotdb.db.utils.SerializedSize;
@@ -31,13 +32,12 @@
* entry and the number of entries.
*/
public class WALMetaData implements SerializedSize {
- /** search index 8 byte, wal entries' number 4 bytes */
+ // search index 8 byte, wal entries' number 4 bytes
private static final int FIXED_SERIALIZED_SIZE = Long.BYTES + Integer.BYTES;
- /** search index of first entry */
+ // search index of first entry
private long firstSearchIndex;
-
- /** each entry's size */
+ // each entry's size
private final List<Integer> buffersSize;
public WALMetaData() {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
index 18e1b89..048407c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.io;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.apache.iotdb.db.wal.buffer.WALEntryType;
@@ -40,7 +40,7 @@
*/
public class WALReader implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(WALReader.class);
- /** 1/10 of .wal file size as buffer size */
+ // use 1/10 of .wal file size as buffer size
private static final int STREAM_BUFFER_SIZE =
(int) IoTDBDescriptor.getInstance().getConfig().getWalFileSizeThresholdInByte() / 10;
@@ -62,7 +62,7 @@
new BufferedInputStream(Files.newInputStream(logFile.toPath()), STREAM_BUFFER_SIZE));
}
- /** Like {@link Iterator#hasNext()} */
+ /** Like {@link Iterator#hasNext()}. */
public boolean hasNext() {
if (nextEntry != null) {
return true;
@@ -77,10 +77,6 @@
nextEntry = null;
return false;
}
- } catch (IllegalPathException e) {
- fileCorrupted = true;
- logger.warn(
- "WALEntry of wal file {} contains illegal path, skip illegal WALEntries.", logFile, e);
} catch (Exception e) {
fileCorrupted = true;
// log only when file should be complete
@@ -92,7 +88,11 @@
return nextEntry != null;
}
- /** Like {@link Iterator#next()} */
+ /**
+ * Like {@link Iterator#next()}.
+ *
+ * @throws NoSuchElementException when not calling hasNext before.
+ */
public WALEntry next() {
if (nextEntry == null) {
throw new NoSuchElementException();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
index b0f809e..b928b58 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.io;
import org.apache.iotdb.db.wal.buffer.WALEntry;
@@ -34,15 +35,18 @@
public static final int MAGIC_STRING_BYTES = MAGIC_STRING.getBytes().length;
private WALFileStatus walFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
-
- /** wal files' metadata */
+ // wal files' metadata
protected final WALMetaData metaData = new WALMetaData();
public WALWriter(File logFile) throws FileNotFoundException {
super(logFile);
}
- /** Writes buffer and update its' metadata */
+ /**
+ * Writes buffer and update its' metadata.
+ *
+ * @throws IOException when failing to write
+ */
public void write(ByteBuffer buffer, WALMetaData metaData) throws IOException {
// update metadata
updateMetaData(metaData);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/IWALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/IWALNode.java
index e700789..293b97c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/IWALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/IWALNode.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.node;
import org.apache.iotdb.consensus.common.DataSet;
@@ -30,16 +31,16 @@
/** This interface provides uniform interface for writing wal and making checkpoints. */
public interface IWALNode extends FlushListener, AutoCloseable, ConsensusReqReader, DataSet {
- /** Log InsertRowNode */
+ /** Log InsertRowNode. */
WALFlushListener log(long memTableId, InsertRowNode insertRowNode);
- /** Log InsertTabletNode */
+ /** Log InsertTabletNode. */
WALFlushListener log(long memTableId, InsertTabletNode insertTabletNode, int start, int end);
- /** Log DeleteDataNode */
+ /** Log DeleteDataNode. */
WALFlushListener log(long memTableId, DeleteDataNode deleteDataNode);
- /** Callback when memTable created */
+ /** Callback when memTable created. */
void onMemTableCreated(IMemTable memTable, String targetTsFile);
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
index e070789..76f7475 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.node;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -23,19 +24,20 @@
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.wal.exception.WALException;
+import org.apache.iotdb.db.wal.utils.listener.AbstractResultListener.Status;
import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
/** This class provides fake wal node when wal is disabled or exception happens. */
public class WALFakeNode implements IWALNode {
- private final WALFlushListener.Status status;
+ private final Status status;
private final WALFlushListener successListener;
private final WALFlushListener failListener;
- private WALFakeNode(WALFlushListener.Status status) {
+ private WALFakeNode(Status status) {
this(status, null);
}
- public WALFakeNode(WALFlushListener.Status status, Exception cause) {
+ public WALFakeNode(Status status, Exception cause) {
this.status = status;
this.successListener = new WALFlushListener(false, null);
this.successListener.succeed();
@@ -116,8 +118,7 @@
public static WALFakeNode getFailureInstance(Exception e) {
return new WALFakeNode(
- WALFlushListener.Status.FAILURE,
- new WALException("Cannot write wal into a fake node. ", e));
+ Status.FAILURE, new WALException("Cannot write wal into a fake node. ", e));
}
public static WALFakeNode getSuccessInstance() {
@@ -125,7 +126,6 @@
}
private static class WALFakeNodeHolder {
- private static final WALFakeNode SUCCESS_INSTANCE =
- new WALFakeNode(WALFlushListener.Status.SUCCESS);
+ private static final WALFakeNode SUCCESS_INSTANCE = new WALFakeNode(Status.SUCCESS);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 8a040a9..4851cd0 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.node;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -49,6 +50,7 @@
import org.apache.iotdb.db.wal.io.WALByteBufReader;
import org.apache.iotdb.db.wal.utils.WALFileStatus;
import org.apache.iotdb.db.wal.utils.WALFileUtils;
+import org.apache.iotdb.db.wal.utils.listener.AbstractResultListener.Status;
import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.utils.TsFileUtils;
@@ -81,35 +83,29 @@
public class WALNode implements IWALNode {
private static final Logger logger = LoggerFactory.getLogger(WALNode.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- /** no iot consensus, all insert nodes can be safely deleted */
+ // no iot consensus, all insert nodes can be safely deleted
public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = Long.MAX_VALUE;
-
- /** timeout threshold when waiting for next wal entry */
+ // timeout threshold when waiting for next wal entry
private static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30;
-
private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();
- /** unique identifier of this WALNode */
+ // unique identifier of this WALNode
private final String identifier;
- /** directory to store this node's files */
+ // directory to store this node's files
private final File logDirectory;
- /** wal buffer */
+ // wal buffer
private final WALBuffer buffer;
- /** manage checkpoints */
+ // manage checkpoints
private final CheckpointManager checkpointManager;
- /**
- * memTable id -> memTable snapshot count, used to avoid write amplification caused by frequent
- * snapshot
- */
+ // memTable id -> memTable snapshot count
+ // used to avoid write amplification caused by frequent snapshot
private final Map<Long, Integer> memTableSnapshotCount = new ConcurrentHashMap<>();
- /**
- * total cost of flushedMemTables. when memControl enabled, cost is memTable ram cost, otherwise
- * cost is memTable count
- */
+ // total cost of flushedMemTables
+ // when memControl enabled, cost is memTable ram cost, otherwise cost is memTable count
private final AtomicLong totalCostOfFlushedMemTables = new AtomicLong();
- /** version id -> cost sum of memTables flushed at this file version */
+ // version id -> cost sum of memTables flushed at this file version
private final Map<Long, Long> walFileVersionId2MemTablesTotalCost = new ConcurrentHashMap<>();
- /** insert nodes whose search index are before this value can be deleted safely */
+ // insert nodes whose search index are before this value can be deleted safely
private volatile long safelyDeletedSearchIndex = DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
public WALNode(String identifier, String logDirectory) throws FileNotFoundException {
@@ -188,19 +184,28 @@
}
// region methods for pipe
- /** Pin the wal files of the given memory table */
+ /**
+ * Pin the wal files of the given memory table. Notice: cannot pin one memTable too long,
+ * otherwise the wal disk usage may too large.
+ *
+ * @throws MemTablePinException If the memTable has been flushed
+ */
public void pinMemTable(long memTableId) throws MemTablePinException {
checkpointManager.pinMemTable(memTableId);
}
- /** Unpin the wal files of the given memory table */
+ /**
+ * Unpin the wal files of the given memory table.
+ *
+ * @throws MemTablePinException If there aren't corresponding pin operations
+ */
public void unpinMemTable(long memTableId) throws MemTablePinException {
checkpointManager.unpinMemTable(memTableId);
}
// endregion
// region Task to delete outdated .wal files
- /** Delete outdated .wal files */
+ /** Delete outdated .wal files. */
public void deleteOutdatedFiles() {
try {
new DeleteOutdatedFileTask().run();
@@ -211,11 +216,11 @@
private class DeleteOutdatedFileTask implements Runnable {
private static final int MAX_RECURSION_TIME = 5;
- /** .wal files whose version ids are less than first valid version id should be deleted */
+ // .wal files whose version ids are less than first valid version id should be deleted
private long firstValidVersionId;
- /** the effective information ratio */
+ // the effective information ratio
private double effectiveInfoRatio;
- /** recursion time of calling deletion */
+ // recursion time of calling deletion
private int recursionTime = 0;
@Override
@@ -280,7 +285,7 @@
}
}
- /** Return true iff cannot delete all outdated files because of IoTConsensus */
+ /** Return true iff cannot delete all outdated files because of IoTConsensus. */
private boolean deleteOutdatedFiles() {
// find all files to delete
// delete files whose version < firstValidVersionId
@@ -348,14 +353,14 @@
return toDelete;
}
- /** Return true iff effective information ratio is too small or disk usage is too large */
+ /** Return true iff effective information ratio is too small or disk usage is too large. */
private boolean shouldSnapshotOrFlush() {
return effectiveInfoRatio < config.getWalMinEffectiveInfoRatio()
|| WALManager.getInstance().shouldThrottle();
}
/**
- * Snapshot or flush one memTable,
+ * Snapshot or flush one memTable.
*
* @return true if snapshot or flush is executed successfully
*/
@@ -442,10 +447,7 @@
}
}
- /**
- * synchronize memTable to make sure snapshot is made before memTable flush operation, {@link
- * org.apache.iotdb.db.engine.storagegroup.TsFileProcessor#flushOneMemTable}
- */
+ // synchronize memTable to make sure snapshot is made before memTable flush operation
private void snapshotMemTable(DataRegion dataRegion, File tsFile, MemTableInfo memTableInfo) {
IMemTable memTable = memTableInfo.getMemTable();
@@ -465,7 +467,7 @@
WALEntry rollWALFileSignal =
new WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
WALFlushListener fileRolledListener = log(rollWALFileSignal);
- if (fileRolledListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+ if (fileRolledListener.waitForResult() == Status.FAILURE) {
logger.error("Fail to roll wal log writer.", fileRolledListener.getCause());
return;
}
@@ -480,7 +482,7 @@
// wait until getting the result
// it's low-risk to block writes awhile because this memTable accumulates slowly
- if (flushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+ if (flushListener.waitForResult() == Status.FAILURE) {
logger.error("Fail to snapshot memTable of {}", tsFile, flushListener.getCause());
}
logger.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/CheckpointRecoverUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/CheckpointRecoverUtils.java
index d9bcd2e..0ef7616 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/CheckpointRecoverUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/CheckpointRecoverUtils.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.recover;
import org.apache.iotdb.db.wal.checkpoint.Checkpoint;
@@ -32,7 +33,7 @@
public class CheckpointRecoverUtils {
private CheckpointRecoverUtils() {}
- /** Recover memTable information from checkpoint folder */
+ /** Recover memTable information from checkpoint folder. */
public static CheckpointInfo recoverMemTableInfo(File logDirectory) {
// find all .checkpoint file
File[] checkpointFiles = CheckpointFileUtils.listAllCheckpointFiles(logDirectory);
@@ -71,6 +72,8 @@
memTableId2Info.remove(memTableInfo.getMemTableId());
}
break;
+ default:
+ break;
}
}
return new CheckpointInfo(maxMemTableId, memTableId2Info);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
index 677e19a..b15986e 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.recover;
import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -55,13 +56,12 @@
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final WALRecoverManager walRecoverManger = WALRecoverManager.getInstance();
- /** this directory store one wal node's .wal and .checkpoint files */
+ // this directory store one wal node's .wal and .checkpoint files
private final File logDirectory;
- /** latch to collect all nodes' recovery end information */
+ // latch to collect all nodes' recovery end information
private final CountDownLatch allNodesRecoveredLatch;
- /** version id of first valid .wal file */
+ // version id of first valid .wal file
private long firstValidVersionId = Long.MAX_VALUE;
-
private Map<Long, MemTableInfo> memTableId2Info;
private Map<Long, UnsealedTsFileRecoverPerformer> memTableId2RecoverPerformer;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
index 3f54548..85e8f8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.recover;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -50,13 +51,13 @@
private static final Logger logger = LoggerFactory.getLogger(WALRecoverManager.class);
private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
- /** true when the recover procedure has started */
+ // true when the recover procedure has started
private volatile boolean hasStarted = false;
- /** start recovery after all data regions have submitted unsealed zero-level TsFiles */
+ // start recovery after all data regions have submitted unsealed zero-level TsFiles
private volatile CountDownLatch allDataRegionScannedLatch;
- /** threads to recover wal nodes */
+ // threads to recover wal nodes
private ExecutorService recoverThreadPool;
- /** stores all UnsealedTsFileRecoverPerformer submitted by data region processors */
+ // stores all UnsealedTsFileRecoverPerformer submitted by data region processors
private final Map<String, UnsealedTsFileRecoverPerformer> absolutePath2RecoverPerformer =
new ConcurrentHashMap<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverWriter.java
index f20d9c6..ae91837 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverWriter.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.recover;
import org.apache.iotdb.db.wal.io.WALMetaData;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
index aa08daf..0e5b1a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.recover.file;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -36,14 +37,14 @@
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SUFFIX;
-/** This class is used to help recover TsFile */
+/** This class is used to help recover TsFile. */
public abstract class AbstractTsFileRecoverPerformer implements Closeable {
private static final Logger logger =
LoggerFactory.getLogger(AbstractTsFileRecoverPerformer.class);
- /** TsFile which needs recovery */
+ // TsFile which needs recovery
protected final TsFileResource tsFileResource;
- /** this writer will be open when .resource file doesn't exist */
+ // this writer will be open when .resource file doesn't exist
protected RestorableTsFileIOWriter writer;
protected AbstractTsFileRecoverPerformer(TsFileResource tsFileResource) {
@@ -54,6 +55,9 @@
* Recover TsFile with RestorableTsFileIOWriter, including load .resource file (reconstruct when
* necessary) and truncate the file to remaining corrected data. <br>
* Notice: this method may open a {@link RestorableTsFileIOWriter}, remember to close it.
+ *
+ * @throws DataRegionException when failing to new RestorableTsFileIOWriter.
+ * @throws IOException when failing to read .resource file.
*/
protected void recoverWithWriter() throws DataRegionException, IOException {
File tsFile = tsFileResource.getTsFile();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/SealedTsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/SealedTsFileRecoverPerformer.java
index ddf966b..9532c97 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/SealedTsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/SealedTsFileRecoverPerformer.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.recover.file;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -33,7 +34,10 @@
/**
* Recover sealed TsFile, including load .resource file (reconstruct when necessary) and truncate
- * the file to remaining corrected data
+ * the file to remaining corrected data.
+ *
+ * @throws DataRegionException when failing to recover.
+ * @throws IOException when failing to end file.
*/
public void recover() throws DataRegionException, IOException {
super.recoverWithWriter();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index 093aa30..0164971 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.recover.file;
import org.apache.iotdb.commons.path.PartialPath;
@@ -32,9 +33,6 @@
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.List;
@@ -44,14 +42,12 @@
* guarantee concurrency safety.
*/
public class TsFilePlanRedoer {
- private static final Logger logger = LoggerFactory.getLogger(TsFilePlanRedoer.class);
-
private final TsFileResource tsFileResource;
- /** only unsequence file tolerates duplicated data */
+ // only unsequence file tolerates duplicated data
private final boolean sequence;
- /** virtual database's idTable of this tsFile */
+ // virtual database's idTable of this tsFile
private final IDTable idTable;
- /** store data when redoing logs */
+ // store data when redoing logs
private IMemTable recoveryMemTable = new PrimitiveMemTable();
public TsFilePlanRedoer(TsFileResource tsFileResource, boolean sequence, IDTable idTable) {
@@ -80,7 +76,6 @@
}
}
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
void redoInsert(InsertNode node) throws WriteProcessException {
if (!node.hasValidMeasurements()) {
return;
@@ -105,8 +100,7 @@
}
if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
- // TODO get device id by idTable
- // idTable.getSeriesSchemas(node);
+ // TODO get device id by idTable - idTable.getSeriesSchemas(node)
} else {
node.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(node.getDevicePath()));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index 76a6eee..8e88f1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.recover.file;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
@@ -61,13 +62,13 @@
private static final Logger logger =
LoggerFactory.getLogger(UnsealedTsFileRecoverPerformer.class);
- /** sequence file or not */
+ // sequence file or not
private final boolean sequence;
- /** add recovered TsFile back to data region */
+ // add recovered TsFile back to data region
private final Consumer<UnsealedTsFileRecoverPerformer> callbackAfterUnsealedTsFileRecovered;
- /** redo wal log to recover TsFile */
+ // redo wal log to recover TsFile
private final TsFilePlanRedoer walRedoer;
- /** trace result of this recovery */
+ // trace result of this recovery
private final WALRecoverListener recoverListener;
public UnsealedTsFileRecoverPerformer(
@@ -85,6 +86,9 @@
/**
* Make preparation for recovery, including load .resource file (reconstruct when necessary) and
* truncate the file to remaining corrected data.
+ *
+ * @throws DataRegionException when failing to recover.
+ * @throws IOException when failing to recover.
*/
public void startRecovery() throws DataRegionException, IOException {
super.recoverWithWriter();
@@ -178,7 +182,7 @@
return modificationsForResource;
}
- /** Redo log */
+ /** Redo log. */
public void redoLog(WALEntry walEntry) {
// skip redo wal log when this TsFile is not crashed
if (!hasCrashed()) {
@@ -209,7 +213,11 @@
}
}
- /** Run last procedures to end this recovery */
+ /**
+ * Run last procedures to end this recovery.
+ *
+ * @throws WALRecoverException when failing to flush the recovered memTable.
+ */
public void endRecovery() throws WALRecoverException {
// skip update info when this TsFile is not crashed
if (hasCrashed()) {
@@ -246,7 +254,8 @@
// set recover progress index for pipe
PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(tsFileResource);
- // if we put following codes in if clause above, this file can be continued writing into it
+ // if we put following codes in the 'if' clause above, this file can be continued writing
+ // into it
// currently, we close this file anyway
writer.endFile();
tsFileResource.serialize();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/CheckpointFileUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/CheckpointFileUtils.java
index e63f45f..51ba1a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/CheckpointFileUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/CheckpointFileUtils.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.utils;
import java.io.File;
@@ -29,25 +30,23 @@
import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_VERSION_ID;
public class CheckpointFileUtils {
- /**
- * versionId is a self-incremented id number, helping to maintain the order of checkpoint files
- */
+ // versionId is a self-incremented id number, helping to maintain the order of checkpoint files
public static final Pattern CHECKPOINT_FILE_NAME_PATTERN =
Pattern.compile(
String.format(
"%s(?<%s>\\d+)\\%s$", WAL_FILE_PREFIX, WAL_VERSION_ID, WAL_CHECKPOINT_FILE_SUFFIX));
- /** Return true when this file is .checkpoint file */
+ /** Return true when this file is .checkpoint file. */
public static boolean checkpointFilenameFilter(File dir, String name) {
return CHECKPOINT_FILE_NAME_PATTERN.matcher(name).find();
}
- /** List all .checkpoint files in the directory */
+ /** List all .checkpoint files in the directory. */
public static File[] listAllCheckpointFiles(File dir) {
return dir.listFiles(CheckpointFileUtils::checkpointFilenameFilter);
}
- /** Parse version id from filename */
+ /** Parse version id from filename. */
public static int parseVersionId(String filename) {
Matcher matcher = CHECKPOINT_FILE_NAME_PATTERN.matcher(filename);
if (matcher.find()) {
@@ -56,14 +55,14 @@
throw new RuntimeException("Invalid checkpoint file name: " + filename);
}
- /** Sort checkpoint files by version id with descending order * */
+ /** Sort checkpoint files by version id with descending order. */
public static void descSortByVersionId(File[] checkpointFiles) {
Arrays.sort(
checkpointFiles,
Comparator.comparingInt(file -> parseVersionId(((File) file).getName())).reversed());
}
- /** Get .checkpoint filename */
+ /** Get .checkpoint filename. */
public static String getLogFileName(long version) {
return WAL_FILE_PREFIX + version + WAL_CHECKPOINT_FILE_SUFFIX;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java
index 52324b5..dda0ad8 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.utils;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
@@ -35,11 +36,13 @@
private static final Logger logger = LoggerFactory.getLogger(WALEntryHandler.class);
private long memTableId = -1;
- /** cached value, null after this value is flushed to wal successfully */
+ // cached value, null after this value is flushed to wal successfully
private volatile WALEntryValue value;
- /** wal entry's position in the wal, valid after the value is flushed to wal successfully */
+ // wal entry's position in the wal, valid after the value is flushed to wal successfully
+ // it's safe to use volatile here to make this reference thread-safe.
+ @SuppressWarnings("squid:S3077")
private final WALEntryPosition walEntryPosition = new WALEntryPosition();
- /** wal node, null when wal is disabled */
+ // wal node, null when wal is disabled
private WALNode walNode = null;
public WALEntryHandler(WALEntryValue value) {
@@ -71,7 +74,11 @@
walNode.unpinMemTable(memTableId);
}
- /** Get this handler's value */
+ /**
+ * Get this handler's value.
+ *
+ * @throws WALPipeException when failing to get the value.
+ */
public InsertNode getValue() throws WALPipeException {
// return local cache
WALEntryValue res = value;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryPosition.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryPosition.java
index acb66a3..9f928cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryPosition.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryPosition.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.utils;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
@@ -29,8 +30,8 @@
import java.util.Objects;
/**
- * This class uses the tuple(file, position, size) to denote the position of the wal entry, and give
- * some methods to read the content from the disk.
+ * This class uses the tuple(identifier, file, position) to denote the position of the wal entry,
+ * and give some methods to read the content from the disk.
*/
public class WALEntryPosition {
private static final WALInsertNodeCache CACHE = WALInsertNodeCache.getInstance();
@@ -38,9 +39,9 @@
private volatile long walFileVersionId = -1;
private volatile long position;
private volatile int size;
- /** wal node, null when wal is disabled */
+ // wal node, null when wal is disabled
private WALNode walNode = null;
- /** wal file is not null when openReadFileChannel method has been called */
+ // wal file is not null when openReadFileChannel method has been called
private File walFile = null;
public WALEntryPosition() {}
@@ -52,7 +53,11 @@
this.size = size;
}
- /** Read the wal entry and parse it to the InsertNode. Use LRU cache to accelerate read. */
+ /**
+ * Read the wal entry and parse it to the InsertNode. Use LRU cache to accelerate read.
+ *
+ * @throws IOException failing to read.
+ */
public InsertNode readInsertNodeViaCache() throws IOException {
if (!canRead()) {
throw new IOException("This entry isn't ready for read.");
@@ -60,7 +65,11 @@
return CACHE.get(this);
}
- /** Read the byte buffer directly. */
+ /**
+ * Read the byte buffer directly.
+ *
+ * @throws IOException failing to read.
+ */
ByteBuffer read() throws IOException {
if (!canRead()) {
throw new IOException("Target file hasn't been specified.");
@@ -75,8 +84,10 @@
}
/**
- * open the read file channel for this wal entry, this method will retry automatically when the
- * file is sealed when opening the file channel
+ * Open the read file channel for this wal entry, this method will retry automatically when the
+ * file is sealed when opening the file channel.
+ *
+ * @throws IOException failing to open the file channel.
*/
public FileChannel openReadFileChannel() throws IOException {
if (isInSealedFile()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileStatus.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileStatus.java
index f843e30..60bc873 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileStatus.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileStatus.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.utils;
/**
@@ -23,9 +24,9 @@
* this contains search index.
*/
public enum WALFileStatus {
- /** This file doesn't contain content needs searching */
+ // This file doesn't contain content needs searching
CONTAINS_NONE_SEARCH_INDEX(0),
- /** This file contains content needs searching */
+ // This file contains content needs searching
CONTAINS_SEARCH_INDEX(1),
;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
index 16efb6b..fc06564 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.utils;
import java.io.File;
@@ -61,17 +62,17 @@
+ "%d"
+ WAL_FILE_SUFFIX;
- /** Return true when this file is .wal file */
+ /** Return true when this file is .wal file. */
public static boolean walFilenameFilter(File dir, String name) {
return WAL_FILE_NAME_PATTERN.matcher(name).find();
}
- /** List all .wal files in the directory */
+ /** List all .wal files in the directory. */
public static File[] listAllWALFiles(File dir) {
return dir.listFiles(WALFileUtils::walFilenameFilter);
}
- /** Get the .wal file starts with the specified version id in the directory */
+ /** Get the .wal file starts with the specified version id in the directory. */
public static File getWALFile(File dir, long versionId) {
String filePrefix = WAL_FILE_PREFIX + versionId + FILE_NAME_SEPARATOR;
File[] files =
@@ -82,7 +83,7 @@
return files[0];
}
- /** Parse version id from filename */
+ /** Parse version id from filename. */
public static long parseVersionId(String filename) {
Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(filename);
if (matcher.find()) {
@@ -91,7 +92,7 @@
throw new RuntimeException("Invalid wal file name: " + filename);
}
- /** Parse start search index from filename */
+ /** Parse start search index from filename. */
public static long parseStartSearchIndex(String filename) {
Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(filename);
if (matcher.find()) {
@@ -100,7 +101,7 @@
throw new RuntimeException("Invalid wal file name: " + filename);
}
- /** Parse status code from filename */
+ /** Parse status code from filename. */
public static WALFileStatus parseStatusCode(String filename) {
Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(filename);
if (matcher.find()) {
@@ -109,7 +110,7 @@
throw new RuntimeException("Invalid wal file name: " + filename);
}
- /** Sort wal files by version id with ascending order */
+ /** Sort wal files by version id with ascending order. */
public static void ascSortByVersionId(File[] walFiles) {
Arrays.sort(walFiles, Comparator.comparingLong(file -> parseVersionId(file.getName())));
}
@@ -159,7 +160,7 @@
return low - 1;
}
- /** Get .wal filename */
+ /** Get .wal filename. */
public static String getLogFileName(long versionId, long startSearchIndex, WALFileStatus status) {
return String.format(WAL_FILE_NAME_FORMAT, versionId, startSearchIndex, status.getCode());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCache.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCache.java
index 2a38102..bbf84b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCache.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.utils;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -44,14 +45,14 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-/** This cache is used by {@link WALEntryPosition} */
+/** This cache is used by {@link WALEntryPosition}. */
public class WALInsertNodeCache {
private static final Logger logger = LoggerFactory.getLogger(WALInsertNodeCache.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- /** LRU cache, find InsertNode by WALEntryPosition */
- private final LoadingCache<WALEntryPosition, InsertNode> lruCache;
- /** ids of all pinned memTables */
+ // LRU cache, find InsertNode by WALEntryPosition
+ private final LoadingCache<WALEntryPosition, InsertNode> lruCache;
+ // ids of all pinned memTables
private final Set<Long> memTablesNeedSearch = ConcurrentHashMap.newKeySet();
private WALInsertNodeCache() {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALMode.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALMode.java
index 53faec5..c174eee 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALMode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALMode.java
@@ -16,20 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.utils;
-/** Write mode of wal */
+/** Write mode of wal. */
public enum WALMode {
- /** disable wal */
+ // disable wal
DISABLE,
- /**
- * submit wal synchronously, write request will not return until its wal is flushed to the disk
- * successfully
- */
+ // submit wal synchronously
+ // write request will not return until its wal is flushed to the disk successfully
SYNC,
- /**
- * submit wal asynchronously, write request will return immediately no matter its wal is flushed
- * to the disk successfully
- */
+ // submit wal asynchronously
+ // write request will return immediately no matter its wal is flushed
ASYNC,
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
index 5eb016c..59598de 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.utils;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
@@ -28,7 +29,7 @@
import java.util.Map;
-/** Like {@link org.apache.iotdb.tsfile.utils.ReadWriteIOUtils} */
+/** Like {@link org.apache.iotdb.tsfile.utils.ReadWriteIOUtils}. */
public class WALWriteUtils {
public static final int BOOLEAN_LEN = ReadWriteIOUtils.BOOLEAN_LEN;
public static final int SHORT_LEN = ReadWriteIOUtils.SHORT_LEN;
@@ -41,7 +42,7 @@
private WALWriteUtils() {}
- /** write a byte to byteBuffer according to flag. If flag is true, write 1, else write 0. */
+ /** Write a byte to byteBuffer according to flag. If flag is true, write 1, else write 0. */
public static int write(Boolean flag, IWALByteBufferView buffer) {
byte a;
if (Boolean.TRUE.equals(flag)) {
@@ -55,7 +56,7 @@
}
/**
- * write a byte n to byteBuffer.
+ * Write a byte n to byteBuffer.
*
* @return The number of bytes used to represent a {@code byte} value in two's complement binary
* form.
@@ -66,7 +67,7 @@
}
/**
- * write a short n to byteBuffer.
+ * Write a short n to byteBuffer.
*
* @return The number of bytes used to represent n.
*/
@@ -76,7 +77,7 @@
}
/**
- * write a short n to byteBuffer.
+ * Write a short n to byteBuffer.
*
* @return The number of bytes used to represent n.
*/
@@ -87,7 +88,7 @@
}
/**
- * write a int n to byteBuffer.
+ * Write a int n to byteBuffer.
*
* @return The number of bytes used to represent n.
*/
@@ -96,26 +97,26 @@
return INT_LEN;
}
- /** write a long n to byteBuffer. */
+ /** Write a long n to byteBuffer. */
public static int write(long n, IWALByteBufferView buffer) {
buffer.putLong(n);
return LONG_LEN;
}
- /** write a float n to byteBuffer. */
+ /** Write a float n to byteBuffer. */
public static int write(float n, IWALByteBufferView buffer) {
buffer.putFloat(n);
return FLOAT_LEN;
}
- /** write a double n to byteBuffer. */
+ /** Write a double n to byteBuffer. */
public static int write(double n, IWALByteBufferView buffer) {
buffer.putDouble(n);
return DOUBLE_LEN;
}
/**
- * write string to byteBuffer.
+ * Write string to byteBuffer.
*
* @return the length of string represented by byte[].
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
index 8b19dc4..3ae87b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.utils.listener;
import org.slf4j.Logger;
@@ -25,10 +26,12 @@
public abstract class AbstractResultListener {
private static final Logger logger = LoggerFactory.getLogger(AbstractResultListener.class);
- /** true means waiting until getting the result */
+ // true means waiting until getting the result
protected final boolean wait;
protected volatile Status status;
+ // it's safe to use volatile here to make this reference thread-safe.
+ @SuppressWarnings("squid:S3077")
protected volatile Exception cause;
protected AbstractResultListener(boolean wait) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
index 2fecd1f..7fd8219 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.utils.listener;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALRecoverListener.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALRecoverListener.java
index 777e18f..4bb62fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALRecoverListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALRecoverListener.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.wal.utils.listener;
/** This class helps judge whether some TsFile is recovered. */
public class WALRecoverListener extends AbstractResultListener {
- /** path of recovering TsFile */
+ // path of recovering TsFile
private final String filePath;
public WALRecoverListener(String filePath) {