blob: 237b353c9f9aaeca0f7edf85d8c98423e92c33a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.recover;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.MemTableInfo;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader;
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.UnsealedTsFileRecoverPerformer;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.CheckpointFileUtils;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.iotdb.consensus.iot.log.ConsensusReqReader.DEFAULT_SEARCH_INDEX;
/** This task is responsible for the recovery of one wal node. */
public class WALNodeRecoverTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(WALNodeRecoverTask.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final WALRecoverManager walRecoverManger = WALRecoverManager.getInstance();
// this directory store one wal node's .wal and .checkpoint files
private final File logDirectory;
// latch to collect all nodes' recovery end information
private final CountDownLatch allNodesRecoveredLatch;
// version id of first valid .wal file
private long firstValidVersionId = Long.MAX_VALUE;
private Map<Long, MemTableInfo> memTableId2Info;
private Map<Long, UnsealedTsFileRecoverPerformer> memTableId2RecoverPerformer;
public WALNodeRecoverTask(File logDirectory, CountDownLatch allNodesRecoveredLatch) {
this.logDirectory = logDirectory;
this.allNodesRecoveredLatch = allNodesRecoveredLatch;
}
@Override
public void run() {
logger.info("Start recovering WAL node in the directory {}", logDirectory);
// recover version id and search index
long[] indexInfo = recoverLastFile();
long lastVersionId = indexInfo[0];
long lastSearchIndex = indexInfo[1];
try {
recoverInfoFromCheckpoints();
recoverTsFiles();
} catch (Exception e) {
for (UnsealedTsFileRecoverPerformer recoverPerformer : memTableId2RecoverPerformer.values()) {
recoverPerformer.getRecoverListener().fail(e);
}
} finally {
for (UnsealedTsFileRecoverPerformer recoverPerformer : memTableId2RecoverPerformer.values()) {
try {
if (!recoverPerformer.canWrite()) {
recoverPerformer.close();
}
} catch (Exception e) {
// continue
}
}
}
try {
if (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
// delete this wal node folder
FileUtils.deleteFileOrDirectory(logDirectory);
logger.info(
"Successfully recover WAL node in the directory {}, so delete these wal files.",
logDirectory);
} else {
// delete checkpoint info to avoid repeated recover
File[] checkpointFiles = CheckpointFileUtils.listAllCheckpointFiles(logDirectory);
for (File checkpointFile : checkpointFiles) {
try {
Files.delete(checkpointFile.toPath());
} catch (IOException e) {
logger.error("error when delete checkpoint file. {}", checkpointFile, e);
}
}
// register wal node
WALManager.getInstance()
.registerWALNode(
logDirectory.getName(),
logDirectory.getAbsolutePath(),
lastVersionId + 1,
lastSearchIndex);
logger.info(
"Successfully recover WAL node in the directory {}, add this node to WALManger.",
logDirectory);
}
} finally {
allNodesRecoveredLatch.countDown();
}
}
private long[] recoverLastFile() {
File[] walFiles = WALFileUtils.listAllWALFiles(logDirectory);
if (walFiles == null || walFiles.length == 0) {
return new long[] {0L, 0L};
}
// get last search index from last wal file
WALFileUtils.ascSortByVersionId(walFiles);
File lastWALFile = walFiles[walFiles.length - 1];
long lastVersionId = WALFileUtils.parseVersionId(lastWALFile.getName());
long lastSearchIndex = WALFileUtils.parseStartSearchIndex(lastWALFile.getName());
WALMetaData metaData = new WALMetaData(lastSearchIndex, new ArrayList<>(), new HashSet<>());
WALFileStatus fileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
try (WALReader walReader = new WALReader(lastWALFile, true)) {
while (walReader.hasNext()) {
WALEntry walEntry = walReader.next();
long searchIndex = DEFAULT_SEARCH_INDEX;
if (walEntry.getType().needSearch()) {
if (walEntry.getType() != WALEntryType.DELETE_DATA_NODE) {
InsertNode insertNode = (InsertNode) walEntry.getValue();
if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
searchIndex = insertNode.getSearchIndex();
lastSearchIndex = Math.max(lastSearchIndex, insertNode.getSearchIndex());
fileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
}
} else {
DeleteDataNode deleteNode = (DeleteDataNode) walEntry.getValue();
if (deleteNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
searchIndex = deleteNode.getSearchIndex();
lastSearchIndex = Math.max(lastSearchIndex, deleteNode.getSearchIndex());
fileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
}
}
}
metaData.add(walEntry.serializedSize(), searchIndex, walEntry.getMemTableId());
}
} catch (Exception e) {
logger.warn("Fail to read wal logs from {}, skip them", lastWALFile, e);
}
// make sure last wal file is correct
WALRecoverWriter walRecoverWriter = new WALRecoverWriter(lastWALFile);
try {
walRecoverWriter.recover(metaData);
} catch (IOException e) {
logger.error("Fail to recover metadata of wal file {}", lastWALFile);
}
// rename last wal file when file status are inconsistent
if (WALFileUtils.parseStatusCode(lastWALFile.getName()) != fileStatus) {
String targetName =
WALFileUtils.getLogFileName(
WALFileUtils.parseVersionId(lastWALFile.getName()),
WALFileUtils.parseStartSearchIndex(lastWALFile.getName()),
fileStatus);
if (!lastWALFile.renameTo(SystemFileFactory.INSTANCE.getFile(logDirectory, targetName))) {
logger.error("Fail to rename file {} to {}", lastWALFile, targetName);
}
}
return new long[] {lastVersionId, lastSearchIndex};
}
private void recoverInfoFromCheckpoints() {
// parse memTables information
CheckpointRecoverUtils.CheckpointInfo info =
CheckpointRecoverUtils.recoverMemTableInfo(logDirectory);
memTableId2Info = info.getMemTableId2Info();
memTableId2RecoverPerformer = new HashMap<>();
// update init memTable id
long maxMemTableId = info.getMaxMemTableId();
AtomicLong memTableIdCounter = AbstractMemTable.memTableIdCounter;
long oldVal = memTableIdCounter.get();
while (maxMemTableId > oldVal) {
if (!memTableIdCounter.compareAndSet(oldVal, maxMemTableId)) {
oldVal = memTableIdCounter.get();
}
}
// update firstValidVersionId and get recover performer from WALRecoverManager
for (MemTableInfo memTableInfo : memTableId2Info.values()) {
firstValidVersionId = Math.min(firstValidVersionId, memTableInfo.getFirstFileVersionId());
UnsealedTsFileRecoverPerformer recoverPerformer =
walRecoverManger.removeRecoverPerformer(new File(memTableInfo.getTsFilePath()));
if (recoverPerformer != null) {
memTableId2RecoverPerformer.put(memTableInfo.getMemTableId(), recoverPerformer);
}
}
}
private void recoverTsFiles() {
if (memTableId2RecoverPerformer.isEmpty()) {
return;
}
// make preparation for recovery
for (UnsealedTsFileRecoverPerformer recoverPerformer : memTableId2RecoverPerformer.values()) {
try {
recoverPerformer.startRecovery();
} catch (Exception e) {
recoverPerformer.getRecoverListener().fail(e);
}
}
// find all valid .wal files
File[] walFiles =
logDirectory.listFiles(
(dir, name) ->
WALFileUtils.walFilenameFilter(dir, name)
&& WALFileUtils.parseVersionId(name) >= firstValidVersionId);
if (walFiles == null) {
endRecovery();
return;
}
// asc sort by version id
WALFileUtils.ascSortByVersionId(walFiles);
// read .wal files and redo logs
for (int i = 0; i < walFiles.length; ++i) {
File walFile = walFiles[i];
try (WALByteBufReader reader = new WALByteBufReader(walFile)) {
if (Collections.disjoint(memTableId2Info.keySet(), reader.getMetaData().getMemTablesId())) {
continue;
}
while (reader.hasNext()) {
ByteBuffer buffer = reader.next();
// see WALInfoEntry#serialize, entry type
buffer.position(Byte.BYTES);
long memTableId = buffer.getLong();
if (!memTableId2Info.containsKey(memTableId)) {
continue;
}
buffer.clear();
WALEntry walEntry =
WALEntry.deserialize(new DataInputStream(new ByteArrayInputStream(buffer.array())));
UnsealedTsFileRecoverPerformer recoverPerformer =
memTableId2RecoverPerformer.get(walEntry.getMemTableId());
if (recoverPerformer != null) {
recoverPerformer.redoLog(walEntry);
} else {
logger.debug(
"Fail to find TsFile recover performer for wal entry in TsFile {}", walFile);
}
}
} catch (Exception e) {
logger.warn("Fail to read wal logs from {}, skip them", walFile, e);
}
}
endRecovery();
}
private void endRecovery() {
// end recovering all recover performers
for (UnsealedTsFileRecoverPerformer recoverPerformer : memTableId2RecoverPerformer.values()) {
try {
recoverPerformer.endRecovery();
recoverPerformer.getRecoverListener().succeed();
} catch (Exception e) {
recoverPerformer.getRecoverListener().fail(e);
}
}
}
}