| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.storageengine; |
| |
| import org.apache.iotdb.common.rpc.thrift.TFlushReq; |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; |
| import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch; |
| import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; |
| import org.apache.iotdb.commons.concurrent.ThreadName; |
| import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; |
| import org.apache.iotdb.commons.conf.CommonDescriptor; |
| import org.apache.iotdb.commons.conf.IoTDBConstant; |
| import org.apache.iotdb.commons.consensus.DataRegionId; |
| import org.apache.iotdb.commons.consensus.index.ProgressIndex; |
| import org.apache.iotdb.commons.exception.ShutdownException; |
| import org.apache.iotdb.commons.exception.StartupException; |
| import org.apache.iotdb.commons.file.SystemFileFactory; |
| import org.apache.iotdb.commons.service.IService; |
| import org.apache.iotdb.commons.service.ServiceType; |
| import org.apache.iotdb.commons.utils.TestOnly; |
| import org.apache.iotdb.commons.utils.TimePartitionUtils; |
| import org.apache.iotdb.consensus.ConsensusFactory; |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor; |
| import org.apache.iotdb.db.exception.DataRegionException; |
| import org.apache.iotdb.db.exception.LoadFileException; |
| import org.apache.iotdb.db.exception.LoadReadOnlyException; |
| import org.apache.iotdb.db.exception.StorageEngineException; |
| import org.apache.iotdb.db.exception.TsFileProcessorException; |
| import org.apache.iotdb.db.exception.WriteProcessRejectException; |
| import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException; |
| import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; |
| import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler; |
| import org.apache.iotdb.db.service.metrics.WritingMetrics; |
| import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache; |
| import org.apache.iotdb.db.storageengine.buffer.ChunkCache; |
| import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache; |
| import org.apache.iotdb.db.storageengine.dataregion.DataRegion; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairLogger; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.repair.UnsortedFileRepairTaskScheduler; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager; |
| import org.apache.iotdb.db.storageengine.dataregion.flush.CloseFileListener; |
| import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener; |
| import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy; |
| import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy.DirectFlushPolicy; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager; |
| import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; |
| import org.apache.iotdb.db.utils.ThreadUtils; |
| import org.apache.iotdb.rpc.RpcUtils; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| import org.apache.iotdb.tsfile.utils.FilePathUtils; |
| import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.ConcurrentModificationException; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Consumer; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR; |
| |
| public class StorageEngine implements IService { |
| private static final Logger LOGGER = LoggerFactory.getLogger(StorageEngine.class); |
| |
| private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); |
| private static final long TTL_CHECK_INTERVAL = 60 * 1000L; |
| private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance(); |
| |
| /** |
| * a folder (system/databases/ by default) that persist system info. Each database will have a |
| * subfolder under the systemDir. |
| */ |
| private final String systemDir = |
| FilePathUtils.regularizePath(CONFIG.getSystemDir()) + "databases"; |
| |
| /** DataRegionId -> DataRegion */ |
| private final ConcurrentHashMap<DataRegionId, DataRegion> dataRegionMap = |
| new ConcurrentHashMap<>(); |
| |
| /** DataRegionId -> DataRegion which is being deleted */ |
| private final ConcurrentHashMap<DataRegionId, DataRegion> deletingDataRegionMap = |
| new ConcurrentHashMap<>(); |
| |
| /** Database name -> ttl, for region recovery only */ |
| private final Map<String, Long> ttlMapForRecover = new ConcurrentHashMap<>(); |
| |
| /** number of ready data region */ |
| private AtomicInteger readyDataRegionNum; |
| |
| private AtomicBoolean isAllSgReady = new AtomicBoolean(false); |
| |
| private ScheduledExecutorService ttlCheckThread; |
| private ScheduledExecutorService seqMemtableTimedFlushCheckThread; |
| private ScheduledExecutorService unseqMemtableTimedFlushCheckThread; |
| |
| private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy(); |
| |
| /** used to do short-lived asynchronous tasks */ |
| private ExecutorService cachedThreadPool; |
| |
| // add customized listeners here for flush and close events |
| private List<CloseFileListener> customCloseFileListeners = new ArrayList<>(); |
| private List<FlushListener> customFlushListeners = new ArrayList<>(); |
| private int recoverDataRegionNum = 0; |
| |
| private final LoadTsFileManager loadTsFileManager = new LoadTsFileManager(); |
| |
| private StorageEngine() {} |
| |
| public static StorageEngine getInstance() { |
| return InstanceHolder.INSTANCE; |
| } |
| |
| private static void initTimePartition() { |
| TimePartitionUtils.setTimePartitionInterval( |
| CommonDescriptor.getInstance().getConfig().getTimePartitionInterval()); |
| } |
| |
| /** block insertion if the insertion is rejected by memory control */ |
| public static void blockInsertionIfReject(TsFileProcessor tsFileProcessor) |
| throws WriteProcessRejectException { |
| long startTime = System.currentTimeMillis(); |
| while (SystemInfo.getInstance().isRejected()) { |
| if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) { |
| break; |
| } |
| try { |
| TimeUnit.MILLISECONDS.sleep(CONFIG.getCheckPeriodWhenInsertBlocked()); |
| if (System.currentTimeMillis() - startTime > CONFIG.getMaxWaitingTimeWhenInsertBlocked()) { |
| throw new WriteProcessRejectException( |
| "System rejected over " + (System.currentTimeMillis() - startTime) + "ms"); |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| public void updateTTLInfo(byte[] allTTLInformation) { |
| if (allTTLInformation == null) { |
| return; |
| } |
| ByteBuffer buffer = ByteBuffer.wrap(allTTLInformation); |
| int mapSize = ReadWriteIOUtils.readInt(buffer); |
| for (int i = 0; i < mapSize; i++) { |
| ttlMapForRecover.put( |
| Objects.requireNonNull(ReadWriteIOUtils.readString(buffer)), |
| ReadWriteIOUtils.readLong(buffer)); |
| } |
| } |
| |
| public boolean isAllSgReady() { |
| return isAllSgReady.get(); |
| } |
| |
| public void setAllSgReady(boolean allSgReady) { |
| isAllSgReady.set(allSgReady); |
| } |
| |
| public void asyncRecover() throws StartupException { |
| setAllSgReady(false); |
| cachedThreadPool = |
| IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName()); |
| |
| List<Future<Void>> futures = new LinkedList<>(); |
| asyncRecover(futures); |
| |
| // wait until wal is recovered |
| if (!CONFIG.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { |
| try { |
| WALRecoverManager.getInstance().recover(); |
| } catch (WALException e) { |
| LOGGER.error("Fail to recover wal.", e); |
| } |
| } |
| |
| // operations after all data regions are recovered |
| Thread recoverEndTrigger = |
| new Thread( |
| () -> { |
| checkResults(futures, "StorageEngine failed to recover."); |
| recoverRepairData(); |
| setAllSgReady(true); |
| ttlMapForRecover.clear(); |
| }, |
| ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName()); |
| recoverEndTrigger.start(); |
| } |
| |
| private void asyncRecover(List<Future<Void>> futures) { |
| Map<String, List<DataRegionId>> localDataRegionInfo = getLocalDataRegionInfo(); |
| localDataRegionInfo.values().forEach(list -> recoverDataRegionNum += list.size()); |
| readyDataRegionNum = new AtomicInteger(0); |
| // init wal recover manager |
| WALRecoverManager.getInstance() |
| .setAllDataRegionScannedLatch(new ExceptionalCountDownLatch(recoverDataRegionNum)); |
| for (Map.Entry<String, List<DataRegionId>> entry : localDataRegionInfo.entrySet()) { |
| String sgName = entry.getKey(); |
| for (DataRegionId dataRegionId : entry.getValue()) { |
| Callable<Void> recoverDataRegionTask = |
| () -> { |
| DataRegion dataRegion = null; |
| try { |
| dataRegion = |
| buildNewDataRegion( |
| sgName, |
| dataRegionId, |
| ttlMapForRecover.getOrDefault(sgName, Long.MAX_VALUE)); |
| } catch (DataRegionException e) { |
| LOGGER.error( |
| "Failed to recover data region {}[{}]", sgName, dataRegionId.getId(), e); |
| return null; |
| } |
| dataRegionMap.put(dataRegionId, dataRegion); |
| LOGGER.info( |
| "Data regions have been recovered {}/{}", |
| readyDataRegionNum.incrementAndGet(), |
| recoverDataRegionNum); |
| return null; |
| }; |
| futures.add(cachedThreadPool.submit(recoverDataRegionTask)); |
| } |
| } |
| } |
| |
| /** get StorageGroup -> DataRegionIdList map from data/system directory. */ |
| public Map<String, List<DataRegionId>> getLocalDataRegionInfo() { |
| File system = SystemFileFactory.INSTANCE.getFile(systemDir); |
| File[] sgDirs = system.listFiles(); |
| Map<String, List<DataRegionId>> localDataRegionInfo = new HashMap<>(); |
| if (sgDirs == null) { |
| return localDataRegionInfo; |
| } |
| for (File sgDir : sgDirs) { |
| if (!sgDir.isDirectory()) { |
| continue; |
| } |
| String sgName = sgDir.getName(); |
| List<DataRegionId> dataRegionIdList = new ArrayList<>(); |
| for (File dataRegionDir : sgDir.listFiles()) { |
| if (!dataRegionDir.isDirectory()) { |
| continue; |
| } |
| dataRegionIdList.add(new DataRegionId(Integer.parseInt(dataRegionDir.getName()))); |
| } |
| localDataRegionInfo.put(sgName, dataRegionIdList); |
| } |
| return localDataRegionInfo; |
| } |
| |
| @Override |
| public void start() throws StartupException { |
| // build time Interval to divide time partition |
| initTimePartition(); |
| // create systemDir |
| try { |
| FileUtils.forceMkdir(SystemFileFactory.INSTANCE.getFile(systemDir)); |
| } catch (IOException e) { |
| throw new StorageEngineFailureException(e); |
| } |
| |
| asyncRecover(); |
| |
| ttlCheckThread = |
| IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.TTL_CHECK.getName()); |
| ScheduledExecutorUtil.safelyScheduleAtFixedRate( |
| ttlCheckThread, |
| this::checkTTL, |
| TTL_CHECK_INTERVAL, |
| TTL_CHECK_INTERVAL, |
| TimeUnit.MILLISECONDS); |
| LOGGER.info("start ttl check thread successfully."); |
| |
| startTimedService(); |
| } |
| |
| private void checkTTL() { |
| try { |
| for (DataRegion dataRegion : dataRegionMap.values()) { |
| if (dataRegion != null) { |
| dataRegion.checkFilesTTL(); |
| } |
| } |
| } catch (ConcurrentModificationException e) { |
| // ignore |
| } catch (Exception e) { |
| LOGGER.error("An error occurred when checking TTL", e); |
| } |
| } |
| |
| private void startTimedService() { |
| // timed flush sequence memtable |
| if (CONFIG.isEnableTimedFlushSeqMemtable()) { |
| seqMemtableTimedFlushCheckThread = |
| IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( |
| ThreadName.TIMED_FLUSH_SEQ_MEMTABLE.getName()); |
| ScheduledExecutorUtil.safelyScheduleAtFixedRate( |
| seqMemtableTimedFlushCheckThread, |
| this::timedFlushSeqMemTable, |
| CONFIG.getSeqMemtableFlushCheckInterval(), |
| CONFIG.getSeqMemtableFlushCheckInterval(), |
| TimeUnit.MILLISECONDS); |
| LOGGER.info("start sequence memtable timed flush check thread successfully."); |
| } |
| // timed flush unsequence memtable |
| if (CONFIG.isEnableTimedFlushUnseqMemtable()) { |
| unseqMemtableTimedFlushCheckThread = |
| IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( |
| ThreadName.TIMED_FLUSH_UNSEQ_MEMTABLE.getName()); |
| ScheduledExecutorUtil.safelyScheduleAtFixedRate( |
| unseqMemtableTimedFlushCheckThread, |
| this::timedFlushUnseqMemTable, |
| CONFIG.getUnseqMemtableFlushCheckInterval(), |
| CONFIG.getUnseqMemtableFlushCheckInterval(), |
| TimeUnit.MILLISECONDS); |
| LOGGER.info("start unsequence memtable timed flush check thread successfully."); |
| } |
| } |
| |
| private void timedFlushSeqMemTable() { |
| for (DataRegion dataRegion : dataRegionMap.values()) { |
| if (dataRegion != null) { |
| dataRegion.timedFlushSeqMemTable(); |
| } |
| } |
| } |
| |
| private void timedFlushUnseqMemTable() { |
| for (DataRegion dataRegion : dataRegionMap.values()) { |
| if (dataRegion != null) { |
| dataRegion.timedFlushUnseqMemTable(); |
| } |
| } |
| } |
| |
| @Override |
| public void stop() { |
| for (DataRegion dataRegion : dataRegionMap.values()) { |
| if (dataRegion != null) { |
| CompactionScheduleTaskManager.getInstance().unregisterDataRegion(dataRegion); |
| } |
| } |
| syncCloseAllProcessor(); |
| ThreadUtils.stopThreadPool(ttlCheckThread, ThreadName.TTL_CHECK); |
| ThreadUtils.stopThreadPool( |
| seqMemtableTimedFlushCheckThread, ThreadName.TIMED_FLUSH_SEQ_MEMTABLE); |
| ThreadUtils.stopThreadPool( |
| unseqMemtableTimedFlushCheckThread, ThreadName.TIMED_FLUSH_UNSEQ_MEMTABLE); |
| if (cachedThreadPool != null) { |
| cachedThreadPool.shutdownNow(); |
| } |
| dataRegionMap.clear(); |
| } |
| |
| @Override |
| public void shutdown(long milliseconds) throws ShutdownException { |
| try { |
| for (DataRegion dataRegion : dataRegionMap.values()) { |
| if (dataRegion != null) { |
| CompactionScheduleTaskManager.getInstance().unregisterDataRegion(dataRegion); |
| } |
| } |
| forceCloseAllProcessor(); |
| } catch (TsFileProcessorException e) { |
| throw new ShutdownException(e); |
| } |
| shutdownTimedService(ttlCheckThread, "TTlCheckThread"); |
| shutdownTimedService(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread"); |
| shutdownTimedService(unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread"); |
| cachedThreadPool.shutdownNow(); |
| dataRegionMap.clear(); |
| } |
| |
| private void shutdownTimedService(ScheduledExecutorService pool, String poolName) { |
| if (pool != null) { |
| pool.shutdownNow(); |
| try { |
| pool.awaitTermination(30, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| LOGGER.warn("{} still doesn't exit after 30s", poolName); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| @Override |
| public ServiceType getID() { |
| return ServiceType.STORAGE_ENGINE_SERVICE; |
| } |
| |
| /** |
| * build a new data region |
| * |
| * @param dataRegionId data region id e.g. 1 |
| * @param logicalStorageGroupName database name e.g. root.sg1 |
| */ |
| public DataRegion buildNewDataRegion( |
| String logicalStorageGroupName, DataRegionId dataRegionId, long ttl) |
| throws DataRegionException { |
| DataRegion dataRegion; |
| LOGGER.info( |
| "construct a data region instance, the database is {}, Thread is {}", |
| logicalStorageGroupName, |
| Thread.currentThread().getId()); |
| dataRegion = |
| new DataRegion( |
| systemDir + File.separator + logicalStorageGroupName, |
| String.valueOf(dataRegionId.getId()), |
| fileFlushPolicy, |
| logicalStorageGroupName); |
| WRITING_METRICS.createFlushingMemTableStatusMetrics(dataRegionId); |
| WRITING_METRICS.createDataRegionMemoryCostMetrics(dataRegion); |
| WRITING_METRICS.createSeriesFullFlushMemTableCounterMetrics(dataRegionId); |
| WRITING_METRICS.createWalFlushMemTableCounterMetrics(dataRegionId); |
| WRITING_METRICS.createTimedFlushMemTableCounterMetrics(dataRegionId); |
| WRITING_METRICS.createActiveMemtableCounterMetrics(dataRegionId); |
| dataRegion.setDataTTLWithTimePrecisionCheck(ttl); |
| dataRegion.setCustomFlushListeners(customFlushListeners); |
| dataRegion.setCustomCloseFileListeners(customCloseFileListeners); |
| return dataRegion; |
| } |
| |
| /** Write data into DataRegion. For standalone mode only. */ |
| public TSStatus write(DataRegionId groupId, PlanNode planNode) { |
| return planNode.accept(new DataExecutionVisitor(), dataRegionMap.get(groupId)); |
| } |
| |
| /** This function is just for unit test. */ |
| @TestOnly |
| public synchronized void reset() { |
| dataRegionMap.clear(); |
| } |
| |
| /** flush command Sync asyncCloseOneProcessor all file node processors. */ |
| public void syncCloseAllProcessor() { |
| LOGGER.info("Start closing all database processor"); |
| List<Future<Void>> tasks = new ArrayList<>(); |
| for (DataRegion dataRegion : dataRegionMap.values()) { |
| if (dataRegion != null) { |
| tasks.add( |
| cachedThreadPool.submit( |
| () -> { |
| dataRegion.syncCloseAllWorkingTsFileProcessors(); |
| return null; |
| })); |
| } |
| } |
| checkResults(tasks, "Failed to sync close processor."); |
| } |
| |
| public void forceCloseAllProcessor() throws TsFileProcessorException { |
| LOGGER.info("Start force closing all database processor"); |
| List<Future<Void>> tasks = new ArrayList<>(); |
| for (DataRegion dataRegion : dataRegionMap.values()) { |
| if (dataRegion != null) { |
| tasks.add( |
| cachedThreadPool.submit( |
| () -> { |
| dataRegion.forceCloseAllWorkingTsFileProcessors(); |
| return null; |
| })); |
| } |
| } |
| checkResults(tasks, "Failed to force close processor."); |
| } |
| |
| public void closeStorageGroupProcessor(String storageGroupPath, boolean isSeq) { |
| List<Future<Void>> tasks = new ArrayList<>(); |
| for (DataRegion dataRegion : dataRegionMap.values()) { |
| if (dataRegion.getDatabaseName().equals(storageGroupPath)) { |
| if (isSeq) { |
| for (TsFileProcessor tsFileProcessor : dataRegion.getWorkSequenceTsFileProcessors()) { |
| tasks.add( |
| cachedThreadPool.submit( |
| () -> { |
| dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor); |
| return null; |
| })); |
| } |
| } else { |
| for (TsFileProcessor tsFileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) { |
| tasks.add( |
| cachedThreadPool.submit( |
| () -> { |
| dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor); |
| return null; |
| })); |
| } |
| } |
| } |
| } |
| checkResults(tasks, "Failed to close database processor."); |
| } |
| |
| private <V> void checkResults(List<Future<V>> tasks, String errorMsg) { |
| for (Future<V> task : tasks) { |
| try { |
| task.get(); |
| } catch (ExecutionException e) { |
| throw new StorageEngineFailureException(errorMsg, e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new StorageEngineFailureException(errorMsg, e); |
| } |
| } |
| } |
| |
| /** |
| * merge all databases. |
| * |
| * @throws StorageEngineException StorageEngineException |
| */ |
| public void mergeAll() throws StorageEngineException { |
| if (CommonDescriptor.getInstance().getConfig().isReadOnly()) { |
| throw new StorageEngineException("Current system mode is read only, does not support merge"); |
| } |
| dataRegionMap.values().forEach(DataRegion::compact); |
| } |
| |
| /** |
| * check and repair unsorted data by compaction. |
| * |
| * @throws StorageEngineException StorageEngineException |
| */ |
| public boolean repairData() throws StorageEngineException { |
| if (CommonDescriptor.getInstance().getConfig().isReadOnly()) { |
| throw new StorageEngineException("Current system mode is read only, does not support merge"); |
| } |
| if (!CompactionScheduleTaskManager.getRepairTaskManagerInstance().markRepairTaskStart()) { |
| return false; |
| } |
| LOGGER.info("start repair data"); |
| List<DataRegion> dataRegionList = new ArrayList<>(dataRegionMap.values()); |
| cachedThreadPool.submit(new UnsortedFileRepairTaskScheduler(dataRegionList, false)); |
| return true; |
| } |
| |
| /** |
| * stop repair data by interrupt |
| * |
| * @throws StorageEngineException StorageEngineException |
| */ |
| public void stopRepairData() throws StorageEngineException { |
| CompactionScheduleTaskManager.RepairDataTaskManager repairDataTaskManager = |
| CompactionScheduleTaskManager.getRepairTaskManagerInstance(); |
| if (!CompactionScheduleTaskManager.getRepairTaskManagerInstance().hasRunningRepairTask()) { |
| return; |
| } |
| LOGGER.info("stop repair data"); |
| try { |
| repairDataTaskManager.markRepairTaskStopping(); |
| repairDataTaskManager.abortRepairTask(); |
| } catch (IOException ignored) { |
| } |
| } |
| |
| /** recover the progress of unfinished repair schedule task */ |
| public void recoverRepairData() { |
| List<DataRegion> dataRegionList = new ArrayList<>(dataRegionMap.values()); |
| String repairLogDirPath = |
| IoTDBDescriptor.getInstance().getConfig().getSystemDir() |
| + File.separator |
| + RepairLogger.repairLogDir; |
| File repairLogDir = new File(repairLogDirPath); |
| if (!repairLogDir.exists() || !repairLogDir.isDirectory()) { |
| return; |
| } |
| File[] files = repairLogDir.listFiles(); |
| List<File> fileList = |
| Stream.of(files == null ? new File[0] : files) |
| .filter( |
| f -> { |
| String fileName = f.getName(); |
| return f.isFile() |
| && (RepairLogger.repairProgressFileName.equals(fileName) |
| || RepairLogger.repairProgressStoppedFileName.equals(fileName)); |
| }) |
| .collect(Collectors.toList()); |
| if (!fileList.isEmpty()) { |
| CompactionScheduleTaskManager.getRepairTaskManagerInstance().markRepairTaskStart(); |
| cachedThreadPool.submit(new UnsortedFileRepairTaskScheduler(dataRegionList, true)); |
| } |
| } |
| |
| public void operateFlush(TFlushReq req) { |
| if (req.storageGroups == null) { |
| StorageEngine.getInstance().syncCloseAllProcessor(); |
| WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes(); |
| } else { |
| for (String storageGroup : req.storageGroups) { |
| if (req.isSeq == null) { |
| StorageEngine.getInstance().closeStorageGroupProcessor(storageGroup, true); |
| StorageEngine.getInstance().closeStorageGroupProcessor(storageGroup, false); |
| } else { |
| StorageEngine.getInstance() |
| .closeStorageGroupProcessor(storageGroup, Boolean.parseBoolean(req.isSeq)); |
| } |
| } |
| } |
| } |
| |
| public void clearCache() { |
| ChunkCache.getInstance().clear(); |
| TimeSeriesMetadataCache.getInstance().clear(); |
| BloomFilterCache.getInstance().clear(); |
| } |
| |
| public void setTTL(List<DataRegionId> dataRegionIdList, long dataTTL) { |
| for (DataRegionId dataRegionId : dataRegionIdList) { |
| DataRegion dataRegion = dataRegionMap.get(dataRegionId); |
| if (dataRegion != null) { |
| dataRegion.setDataTTLWithTimePrecisionCheck(dataTTL); |
| } |
| } |
| } |
| |
| /** |
| * Add a listener to listen flush start/end events. Notice that this addition only applies to |
| * TsFileProcessors created afterwards. |
| * |
| * @param listener |
| */ |
| public void registerFlushListener(FlushListener listener) { |
| customFlushListeners.add(listener); |
| } |
| |
| /** |
| * Add a listener to listen file close events. Notice that this addition only applies to |
| * TsFileProcessors created afterwards. |
| * |
| * @param listener |
| */ |
| public void registerCloseFileListener(CloseFileListener listener) { |
| customCloseFileListeners.add(listener); |
| } |
| |
| private void makeSureNoOldRegion(DataRegionId regionId) { |
| while (deletingDataRegionMap.containsKey(regionId)) { |
| DataRegion oldRegion = deletingDataRegionMap.get(regionId); |
| if (oldRegion != null) { |
| oldRegion.waitForDeleted(); |
| } |
| } |
| } |
| |
| // When registering a new region, the coordinator needs to register the corresponding region with |
| // the local storageengine before adding the corresponding consensusGroup to the consensus layer |
| public DataRegion createDataRegion(DataRegionId regionId, String sg, long ttl) |
| throws DataRegionException { |
| makeSureNoOldRegion(regionId); |
| AtomicReference<DataRegionException> exceptionAtomicReference = new AtomicReference<>(null); |
| DataRegion dataRegion = |
| dataRegionMap.computeIfAbsent( |
| regionId, |
| x -> { |
| try { |
| return buildNewDataRegion(sg, x, ttl); |
| } catch (DataRegionException e) { |
| exceptionAtomicReference.set(e); |
| } |
| return null; |
| }); |
| if (exceptionAtomicReference.get() != null) { |
| throw exceptionAtomicReference.get(); |
| } |
| return dataRegion; |
| } |
| |
| public void deleteDataRegion(DataRegionId regionId) { |
| if (!dataRegionMap.containsKey(regionId) || deletingDataRegionMap.containsKey(regionId)) { |
| return; |
| } |
| DataRegion region = |
| deletingDataRegionMap.computeIfAbsent(regionId, k -> dataRegionMap.remove(regionId)); |
| if (region != null) { |
| region.markDeleted(); |
| WRITING_METRICS.removeDataRegionMemoryCostMetrics(regionId); |
| WRITING_METRICS.removeFlushingMemTableStatusMetrics(regionId); |
| WRITING_METRICS.removeActiveMemtableCounterMetrics(regionId); |
| WRITING_METRICS.removeWalFlushMemTableCounterMetrics(regionId); |
| WRITING_METRICS.removeTimedFlushMemTableCounterMetrics(regionId); |
| WRITING_METRICS.removeSeriesFullFlushMemTableCounterMetrics(regionId); |
| try { |
| region.abortCompaction(); |
| region.syncDeleteDataFiles(); |
| region.deleteFolder(systemDir); |
| if (CONFIG.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) { |
| // delete wal |
| WALManager.getInstance() |
| .deleteWALNode( |
| region.getDatabaseName() + FILE_NAME_SEPARATOR + region.getDataRegionId()); |
| // delete snapshot |
| for (String dataDir : CONFIG.getLocalDataDirs()) { |
| File regionSnapshotDir = |
| new File( |
| dataDir + File.separator + IoTDBConstant.SNAPSHOT_FOLDER_NAME, |
| region.getDatabaseName() + FILE_NAME_SEPARATOR + regionId.getId()); |
| if (regionSnapshotDir.exists()) { |
| try { |
| FileUtils.deleteDirectory(regionSnapshotDir); |
| } catch (IOException e) { |
| LOGGER.error("Failed to delete snapshot dir {}", regionSnapshotDir, e); |
| } |
| } |
| } |
| } |
| } catch (Exception e) { |
| LOGGER.error( |
| "Error occurs when deleting data region {}-{}", |
| region.getDatabaseName(), |
| region.getDataRegionId(), |
| e); |
| } finally { |
| deletingDataRegionMap.remove(regionId); |
| } |
| } |
| } |
| |
| /** |
| * run the runnable if the region is absent. if the region is present, do nothing. |
| * |
| * <p>we don't use computeIfAbsent because we don't want to create a new region if the region is |
| * absent, we just want to run the runnable in a synchronized way. |
| * |
| * @return true if the region is absent and the runnable is run. false if the region is present. |
| */ |
| public boolean runIfAbsent(DataRegionId regionId, Runnable runnable) { |
| final AtomicBoolean result = new AtomicBoolean(false); |
| dataRegionMap.computeIfAbsent( |
| regionId, |
| k -> { |
| runnable.run(); |
| result.set(true); |
| return null; |
| }); |
| return result.get(); |
| } |
| |
| /** |
| * run the consumer if the region is present. if the region is absent, do nothing. |
| * |
| * <p>we don't use computeIfPresent because we don't want to remove the region if the consumer |
| * returns null, we just want to run the consumer in a synchronized way. |
| * |
| * @return true if the region is present and the consumer is run. false if the region is absent. |
| */ |
| public boolean runIfPresent(DataRegionId regionId, Consumer<DataRegion> consumer) { |
| final AtomicBoolean result = new AtomicBoolean(false); |
| dataRegionMap.computeIfPresent( |
| regionId, |
| (id, region) -> { |
| consumer.accept(region); |
| result.set(true); |
| return region; |
| }); |
| return result.get(); |
| } |
| |
| public DataRegion getDataRegion(DataRegionId regionId) { |
| return dataRegionMap.get(regionId); |
| } |
| |
| public List<DataRegion> getAllDataRegions() { |
| return new ArrayList<>(dataRegionMap.values()); |
| } |
| |
| public List<DataRegionId> getAllDataRegionIds() { |
| return new ArrayList<>(dataRegionMap.keySet()); |
| } |
| |
| /** This method is not thread-safe */ |
| public void setDataRegion(DataRegionId regionId, DataRegion newRegion) { |
| if (dataRegionMap.containsKey(regionId)) { |
| DataRegion oldRegion = dataRegionMap.get(regionId); |
| oldRegion.syncCloseAllWorkingTsFileProcessors(); |
| oldRegion.abortCompaction(); |
| } |
| dataRegionMap.put(regionId, newRegion); |
| } |
| |
| public TSStatus setTTL(TSetTTLReq req) { |
| Map<String, List<DataRegionId>> localDataRegionInfo = |
| StorageEngine.getInstance().getLocalDataRegionInfo(); |
| List<DataRegionId> dataRegionIdList = new ArrayList<>(); |
| req.storageGroupPathPattern.forEach( |
| storageGroup -> dataRegionIdList.addAll(localDataRegionInfo.get(storageGroup))); |
| for (DataRegionId dataRegionId : dataRegionIdList) { |
| DataRegion dataRegion = dataRegionMap.get(dataRegionId); |
| if (dataRegion != null) { |
| dataRegion.setDataTTLWithTimePrecisionCheck(req.TTL); |
| } |
| } |
| return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); |
| } |
| |
| public TsFileFlushPolicy getFileFlushPolicy() { |
| return fileFlushPolicy; |
| } |
| |
| public TSStatus writeLoadTsFileNode( |
| DataRegionId dataRegionId, LoadTsFilePieceNode pieceNode, String uuid) { |
| TSStatus status = new TSStatus(); |
| |
| if (CommonDescriptor.getInstance().getConfig().isReadOnly()) { |
| status.setCode(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()); |
| status.setMessage(LoadReadOnlyException.MESSAGE); |
| return status; |
| } |
| |
| try { |
| loadTsFileManager.writeToDataRegion(getDataRegion(dataRegionId), pieceNode, uuid); |
| } catch (IOException e) { |
| LOGGER.error( |
| "IO error when writing piece node of TsFile {} to DataRegion {}.", |
| pieceNode.getTsFile(), |
| dataRegionId, |
| e); |
| status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); |
| status.setMessage(e.getMessage()); |
| return status; |
| } |
| |
| return RpcUtils.SUCCESS_STATUS; |
| } |
| |
| public TSStatus executeLoadCommand( |
| LoadTsFileScheduler.LoadCommand loadCommand, |
| String uuid, |
| boolean isGeneratedByPipe, |
| ProgressIndex progressIndex) { |
| TSStatus status = new TSStatus(); |
| |
| try { |
| switch (loadCommand) { |
| case EXECUTE: |
| if (loadTsFileManager.loadAll(uuid, isGeneratedByPipe, progressIndex)) { |
| status = RpcUtils.SUCCESS_STATUS; |
| } else { |
| status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); |
| status.setMessage( |
| String.format( |
| "No load TsFile uuid %s recorded for execute load command %s.", |
| uuid, loadCommand)); |
| } |
| break; |
| case ROLLBACK: |
| if (loadTsFileManager.deleteAll(uuid)) { |
| status = RpcUtils.SUCCESS_STATUS; |
| } else { |
| status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); |
| status.setMessage( |
| String.format( |
| "No load TsFile uuid %s recorded for execute load command %s.", |
| uuid, loadCommand)); |
| } |
| break; |
| default: |
| status.setCode(TSStatusCode.ILLEGAL_PARAMETER.getStatusCode()); |
| status.setMessage(String.format("Wrong load command %s.", loadCommand)); |
| } |
| } catch (IOException | LoadFileException e) { |
| LOGGER.error("Execute load command {} error.", loadCommand, e); |
| status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); |
| status.setMessage(e.getMessage()); |
| } |
| |
| return status; |
| } |
| |
| /** reboot timed flush sequence/unsequence memetable thread */ |
| public void rebootTimedService() throws ShutdownException { |
| LOGGER.info("Start rebooting all timed service."); |
| |
| // exclude ttl check thread |
| stopTimedServiceAndThrow(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread"); |
| stopTimedServiceAndThrow( |
| unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread"); |
| |
| LOGGER.info("Stop all timed service successfully, and now restart them."); |
| |
| startTimedService(); |
| |
| LOGGER.info("Reboot all timed service successfully"); |
| } |
| |
| private void stopTimedServiceAndThrow(ScheduledExecutorService pool, String poolName) |
| throws ShutdownException { |
| if (pool != null) { |
| pool.shutdownNow(); |
| try { |
| pool.awaitTermination(30, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| LOGGER.warn("{} still doesn't exit after 30s", poolName); |
| throw new ShutdownException(e); |
| } |
| } |
| } |
| |
| public void getDiskSizeByDataRegion( |
| Map<Integer, Long> dataRegionDisk, List<Integer> dataRegionIds) { |
| dataRegionMap.forEach( |
| (dataRegionId, dataRegion) -> { |
| if (dataRegionIds.contains(dataRegionId.getId())) { |
| dataRegionDisk.put(dataRegionId.getId(), dataRegion.countRegionDiskSize()); |
| } |
| }); |
| } |
| |
| static class InstanceHolder { |
| |
| private static final StorageEngine INSTANCE = new StorageEngine(); |
| |
| private InstanceHolder() { |
| // forbidding instantiation |
| } |
| } |
| } |