| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.storageengine.dataregion.wal.recover; |
| |
| import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch; |
| import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; |
| import org.apache.iotdb.commons.concurrent.ThreadName; |
| import org.apache.iotdb.commons.conf.CommonConfig; |
| import org.apache.iotdb.commons.conf.CommonDescriptor; |
| import org.apache.iotdb.commons.exception.StartupException; |
| import org.apache.iotdb.commons.file.SystemFileFactory; |
| import org.apache.iotdb.commons.utils.TestOnly; |
| import org.apache.iotdb.db.exception.DataRegionException; |
| import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALRecoverException; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.UnsealedTsFileRecoverPerformer; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| |
| import static org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils.getTsFileRelativePath; |
| |
| /** First set allVsgScannedLatch, then call recover method. */ |
| public class WALRecoverManager { |
| private static final Logger logger = LoggerFactory.getLogger(WALRecoverManager.class); |
| private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); |
| |
| // true when the recover procedure has started |
| private volatile boolean hasStarted = false; |
| |
| // start recovery after all data regions have submitted unsealed zero-level TsFiles |
| @SuppressWarnings("squid:S3077") |
| private volatile ExceptionalCountDownLatch allDataRegionScannedLatch; |
| |
| // threads to recover wal nodes |
| private ExecutorService recoverThreadPool; |
| // stores all UnsealedTsFileRecoverPerformer submitted by data region processors |
| private final Map<String, UnsealedTsFileRecoverPerformer> absolutePath2RecoverPerformer = |
| new ConcurrentHashMap<>(); |
| |
| private WALRecoverManager() {} |
| |
| public void recover() throws WALRecoverException, StartupException { |
| logger.info("Start recovering wal."); |
| try { |
| // collect wal nodes' information |
| List<File> walNodeDirs = new ArrayList<>(); |
| for (String walDir : commonConfig.getWalDirs()) { |
| File walDirFile = SystemFileFactory.INSTANCE.getFile(walDir); |
| File[] nodeDirs = walDirFile.listFiles(File::isDirectory); |
| if (nodeDirs == null) { |
| continue; |
| } |
| for (File nodeDir : nodeDirs) { |
| if (nodeDir.isDirectory()) { |
| walNodeDirs.add(nodeDir); |
| } |
| } |
| } |
| // wait until all data regions have submitted their unsealed TsFiles, |
| // which means walRecoverManger.addRecoverPerformer method won't be call anymore |
| try { |
| allDataRegionScannedLatch.await(); |
| if (allDataRegionScannedLatch.hasException()) { |
| throw new DataRegionException(allDataRegionScannedLatch.getExceptionMessage()); |
| } |
| hasStarted = true; |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new WALRecoverException("Fail to recover wal.", e); |
| } |
| logger.info( |
| "Data regions have submitted all unsealed TsFiles, start recovering TsFiles in each wal node."); |
| // recover each wal node's TsFiles |
| if (!walNodeDirs.isEmpty()) { |
| recoverThreadPool = |
| IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.WAL_RECOVER.getName()); |
| CountDownLatch allNodesRecoveredLatch = new CountDownLatch(walNodeDirs.size()); |
| for (File walNodeDir : walNodeDirs) { |
| recoverThreadPool.submit(new WALNodeRecoverTask(walNodeDir, allNodesRecoveredLatch)); |
| } |
| |
| try { |
| allNodesRecoveredLatch.await(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new WALRecoverException("Fail to recover wal.", e); |
| } |
| } |
| // deal with remaining TsFiles which don't have wal |
| asyncRecoverLeftTsFiles(); |
| } catch (DataRegionException e) { |
| throw new StartupException(e.getMessage()); |
| } catch (Exception e) { |
| for (UnsealedTsFileRecoverPerformer recoverPerformer : |
| absolutePath2RecoverPerformer.values()) { |
| recoverPerformer.getRecoverListener().fail(e); |
| } |
| } finally { |
| for (UnsealedTsFileRecoverPerformer recoverPerformer : |
| absolutePath2RecoverPerformer.values()) { |
| try { |
| if (!recoverPerformer.canWrite()) { |
| recoverPerformer.close(); |
| } |
| } catch (Exception e) { |
| // continue |
| } |
| } |
| stop(); |
| } |
| logger.info("Successfully recover all wal nodes."); |
| } |
| |
| private void asyncRecoverLeftTsFiles() { |
| if (absolutePath2RecoverPerformer.isEmpty()) { |
| return; |
| } |
| |
| List<Future<Void>> futures = new ArrayList<>(); |
| ExecutorService recoverTsFilesThreadPool = |
| IoTDBThreadPoolFactory.newFixedThreadPool( |
| Runtime.getRuntime().availableProcessors(), ThreadName.TSFILE_RECOVER.getName()); |
| // async recover |
| for (UnsealedTsFileRecoverPerformer recoverPerformer : absolutePath2RecoverPerformer.values()) { |
| Callable<Void> recoverTsFileTask = |
| () -> { |
| try { |
| recoverPerformer.startRecovery(); |
| // skip redo logs because it doesn't belong to any wal node |
| recoverPerformer.endRecovery(); |
| recoverPerformer.getRecoverListener().succeed(); |
| } catch (DataRegionException | IOException | WALRecoverException e) { |
| logger.error( |
| "Fail to recover unsealed TsFile {}, skip it.", |
| recoverPerformer.getTsFileAbsolutePath(), |
| e); |
| recoverPerformer.getRecoverListener().fail(e); |
| } |
| return null; |
| }; |
| futures.add(recoverTsFilesThreadPool.submit(recoverTsFileTask)); |
| } |
| // wait until all tasks done |
| for (Future<Void> future : futures) { |
| try { |
| future.get(); |
| } catch (ExecutionException e) { |
| throw new StorageEngineFailureException("StorageEngine failed to recover.", e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new StorageEngineFailureException("StorageEngine failed to recover.", e); |
| } |
| } |
| recoverTsFilesThreadPool.shutdown(); |
| } |
| |
| public WALRecoverListener addRecoverPerformer(UnsealedTsFileRecoverPerformer recoverPerformer) { |
| if (hasStarted) { |
| logger.error("Cannot recover tsfile from wal because wal recovery has already started"); |
| return null; |
| } else { |
| try { |
| String tsFileRelativePath = |
| getTsFileRelativePath( |
| recoverPerformer.getTsFileResource().getTsFile().getCanonicalPath()); |
| absolutePath2RecoverPerformer.put(tsFileRelativePath, recoverPerformer); |
| } catch (IOException e) { |
| logger.error( |
| "Fail to add recover performer for file {}", |
| recoverPerformer.getTsFileAbsolutePath(), |
| e); |
| } |
| } |
| return recoverPerformer.getRecoverListener(); |
| } |
| |
| UnsealedTsFileRecoverPerformer removeRecoverPerformer(File file) { |
| try { |
| return absolutePath2RecoverPerformer.remove(getTsFileRelativePath(file.getCanonicalPath())); |
| } catch (IOException e) { |
| logger.error("Fail to remove recover performer for file {}", file, e); |
| } |
| return null; |
| } |
| |
| public ExceptionalCountDownLatch getAllDataRegionScannedLatch() { |
| return allDataRegionScannedLatch; |
| } |
| |
| public void setAllDataRegionScannedLatch(ExceptionalCountDownLatch allDataRegionScannedLatch) { |
| this.allDataRegionScannedLatch = allDataRegionScannedLatch; |
| } |
| |
| public void stop() { |
| absolutePath2RecoverPerformer.clear(); |
| if (recoverThreadPool != null) { |
| recoverThreadPool.shutdown(); |
| recoverThreadPool = null; |
| } |
| } |
| |
| @TestOnly |
| public void clear() { |
| stop(); |
| hasStarted = false; |
| } |
| |
| public static WALRecoverManager getInstance() { |
| return InstanceHolder.INSTANCE; |
| } |
| |
| private static class InstanceHolder { |
| private InstanceHolder() {} |
| |
| private static final WALRecoverManager INSTANCE = new WALRecoverManager(); |
| } |
| } |