| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.storageengine.dataregion.flush; |
| |
| import org.apache.iotdb.commons.service.metric.MetricService; |
| import org.apache.iotdb.commons.service.metric.enums.Metric; |
| import org.apache.iotdb.commons.service.metric.enums.Tag; |
| import org.apache.iotdb.commons.utils.CommonDateTimeUtils; |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.service.metrics.WritingMetrics; |
| import org.apache.iotdb.db.storageengine.dataregion.DataRegion; |
| import org.apache.iotdb.db.storageengine.dataregion.flush.pool.FlushSubTaskPoolManager; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup; |
| import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; |
| import org.apache.iotdb.metrics.utils.MetricLevel; |
| import org.apache.iotdb.tsfile.file.metadata.IDeviceID; |
| import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; |
| import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * flush task to flush one memtable using a pipeline model to flush, which is sort memtable -> |
| * encoding -> write to disk (io task) |
| */ |
| public class MemTableFlushTask { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class); |
| private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = |
| FlushSubTaskPoolManager.getInstance(); |
| private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance(); |
| private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); |
| /* storage group name -> last time */ |
| private static final Map<String, Long> flushPointsCache = new ConcurrentHashMap<>(); |
| private final Future<?> encodingTaskFuture; |
| private final Future<?> ioTaskFuture; |
| private RestorableTsFileIOWriter writer; |
| |
| private final LinkedBlockingQueue<Object> encodingTaskQueue = new LinkedBlockingQueue<>(); |
| private final LinkedBlockingQueue<Object> ioTaskQueue = |
| (SystemInfo.getInstance().isEncodingFasterThanIo()) |
| ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing()) |
| : new LinkedBlockingQueue<>(); |
| |
| private String storageGroup; |
| private String dataRegionId; |
| |
| private IMemTable memTable; |
| |
| private volatile long memSerializeTime = 0L; |
| private volatile long ioTime = 0L; |
| |
| /** |
| * @param memTable the memTable to flush |
| * @param writer the writer where memTable will be flushed to (current tsfile writer or vm writer) |
| * @param storageGroup current database |
| */ |
| public MemTableFlushTask( |
| IMemTable memTable, |
| RestorableTsFileIOWriter writer, |
| String storageGroup, |
| String dataRegionId) { |
| this.memTable = memTable; |
| this.writer = writer; |
| this.storageGroup = storageGroup; |
| this.dataRegionId = dataRegionId; |
| this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask); |
| this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask); |
| LOGGER.debug( |
| "flush task of database {} memtable is created, flushing to file {}.", |
| storageGroup, |
| writer.getFile().getName()); |
| } |
| |
| /** the function for flushing memtable. */ |
| @SuppressWarnings("squid:S3776") |
| public void syncFlushMemTable() throws ExecutionException, InterruptedException { |
| long avgSeriesPointsNum = |
| memTable.getSeriesNumber() == 0 |
| ? 0 |
| : memTable.getTotalPointsNum() / memTable.getSeriesNumber(); |
| WRITING_METRICS.recordFlushingMemTableStatus( |
| storageGroup, |
| memTable.memSize(), |
| memTable.getSeriesNumber(), |
| memTable.getTotalPointsNum(), |
| avgSeriesPointsNum); |
| |
| long estimatedTemporaryMemSize = 0L; |
| if (SystemInfo.getInstance().isEncodingFasterThanIo()) { |
| estimatedTemporaryMemSize = |
| memTable.getSeriesNumber() == 0 |
| ? 0 |
| : memTable.memSize() |
| / memTable.getSeriesNumber() |
| * config.getIoTaskQueueSizeForFlushing(); |
| SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize); |
| } |
| long start = System.currentTimeMillis(); |
| long sortTime = 0; |
| |
| // for map do not use get(key) to iterate |
| Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap(); |
| List<IDeviceID> deviceIDList = new ArrayList<>(memTableMap.keySet()); |
| // sort the IDeviceID in lexicographical order |
| Collections.sort(deviceIDList); |
| for (IDeviceID deviceID : deviceIDList) { |
| final Map<String, IWritableMemChunk> value = memTableMap.get(deviceID).getMemChunkMap(); |
| // skip the empty device/chunk group |
| if (memTableMap.get(deviceID).count() == 0 || value.isEmpty()) { |
| continue; |
| } |
| encodingTaskQueue.put(new StartFlushGroupIOTask(deviceID)); |
| List<String> seriesInOrder = new ArrayList<>(value.keySet()); |
| Collections.sort(seriesInOrder); |
| for (String seriesId : seriesInOrder) { |
| long startTime = System.currentTimeMillis(); |
| IWritableMemChunk series = value.get(seriesId); |
| if (series.count() == 0) { |
| continue; |
| } |
| /* |
| * sort task (first task of flush pipeline) |
| */ |
| series.sortTvListForFlush(); |
| long subTaskTime = System.currentTimeMillis() - startTime; |
| sortTime += subTaskTime; |
| WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.SORT_TASK, subTaskTime); |
| encodingTaskQueue.put(series); |
| } |
| |
| encodingTaskQueue.put(new EndChunkGroupIoTask()); |
| } |
| encodingTaskQueue.put(new TaskEnd()); |
| LOGGER.debug( |
| "Database {} memtable flushing into file {}: data sort time cost {} ms.", |
| storageGroup, |
| writer.getFile().getName(), |
| sortTime); |
| WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_SORT, sortTime); |
| |
| try { |
| encodingTaskFuture.get(); |
| } catch (InterruptedException | ExecutionException e) { |
| ioTaskFuture.cancel(true); |
| if (e instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); |
| } |
| throw e; |
| } |
| |
| ioTaskFuture.get(); |
| |
| try { |
| long writePlanIndicesStartTime = System.currentTimeMillis(); |
| writer.writePlanIndices(); |
| WRITING_METRICS.recordFlushCost( |
| WritingMetrics.WRITE_PLAN_INDICES, |
| System.currentTimeMillis() - writePlanIndicesStartTime); |
| } catch (IOException e) { |
| throw new ExecutionException(e); |
| } |
| |
| if (estimatedTemporaryMemSize != 0) { |
| SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize); |
| } |
| SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime); |
| |
| MetricService.getInstance() |
| .timer( |
| System.currentTimeMillis() - start, |
| TimeUnit.MILLISECONDS, |
| Metric.COST_TASK.toString(), |
| MetricLevel.CORE, |
| Tag.NAME.toString(), |
| "flush"); |
| } |
| |
| /** encoding task (second task of pipeline) */ |
| private Runnable encodingTask = |
| new Runnable() { |
| |
| @SuppressWarnings("squid:S135") |
| @Override |
| public void run() { |
| LOGGER.debug( |
| "Database {} memtable flushing to file {} starts to encoding data.", |
| storageGroup, |
| writer.getFile().getName()); |
| while (true) { |
| |
| Object task; |
| try { |
| task = encodingTaskQueue.take(); |
| } catch (InterruptedException e1) { |
| LOGGER.error("Take task into ioTaskQueue Interrupted"); |
| Thread.currentThread().interrupt(); |
| break; |
| } |
| if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) { |
| try { |
| ioTaskQueue.put(task); |
| } catch ( |
| @SuppressWarnings("squid:S2142") |
| InterruptedException e) { |
| LOGGER.error( |
| "Database {} memtable flushing to file {}, encoding task is interrupted.", |
| storageGroup, |
| writer.getFile().getName(), |
| e); |
| // generally it is because the thread pool is shutdown so the task should be aborted |
| break; |
| } |
| } else if (task instanceof TaskEnd) { |
| break; |
| } else { |
| long starTime = System.currentTimeMillis(); |
| IWritableMemChunk writableMemChunk = (IWritableMemChunk) task; |
| IChunkWriter seriesWriter = writableMemChunk.createIChunkWriter(); |
| writableMemChunk.encode(seriesWriter); |
| seriesWriter.sealCurrentPage(); |
| seriesWriter.clearPageWriter(); |
| try { |
| ioTaskQueue.put(seriesWriter); |
| } catch (InterruptedException e) { |
| LOGGER.error("Put task into ioTaskQueue Interrupted"); |
| Thread.currentThread().interrupt(); |
| } |
| long subTaskTime = System.currentTimeMillis() - starTime; |
| WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime); |
| memSerializeTime += subTaskTime; |
| } |
| } |
| try { |
| ioTaskQueue.put(new TaskEnd()); |
| } catch (InterruptedException e) { |
| LOGGER.error("Put task into ioTaskQueue Interrupted"); |
| Thread.currentThread().interrupt(); |
| } |
| |
| DataRegion.getNonSystemDatabaseName(storageGroup) |
| .ifPresent( |
| databaseName -> |
| recordFlushPointsMetricInternal( |
| memTable.getTotalPointsNum(), databaseName, dataRegionId)); |
| WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_ENCODING, memSerializeTime); |
| } |
| }; |
| |
| public static void recordFlushPointsMetricInternal( |
| long totalPointsNum, String storageGroupName, String dataRegionId) { |
| long currentTime = CommonDateTimeUtils.currentTime(); |
| // compute the flush points |
| long writeTime = |
| flushPointsCache.compute( |
| storageGroupName, |
| (storageGroup, lastTime) -> { |
| if (lastTime == null || lastTime != currentTime) { |
| return currentTime; |
| } else { |
| return currentTime + 1; |
| } |
| }); |
| // record the flush points |
| MetricService.getInstance() |
| .gaugeWithInternalReportAsync( |
| totalPointsNum, |
| Metric.POINTS.toString(), |
| MetricLevel.CORE, |
| writeTime, |
| Tag.DATABASE.toString(), |
| storageGroupName, |
| Tag.TYPE.toString(), |
| "flush", |
| Tag.REGION.toString(), |
| dataRegionId); |
| } |
| |
| /** io task (third task of pipeline) */ |
| @SuppressWarnings("squid:S135") |
| private Runnable ioTask = |
| () -> { |
| LOGGER.debug( |
| "Database {} memtable flushing to file {} start io.", |
| storageGroup, |
| writer.getFile().getName()); |
| while (true) { |
| Object ioMessage = null; |
| try { |
| ioMessage = ioTaskQueue.take(); |
| } catch (InterruptedException e1) { |
| LOGGER.error("take task from ioTaskQueue Interrupted"); |
| Thread.currentThread().interrupt(); |
| break; |
| } |
| long starTime = System.currentTimeMillis(); |
| try { |
| if (ioMessage instanceof StartFlushGroupIOTask) { |
| this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId); |
| } else if (ioMessage instanceof TaskEnd) { |
| break; |
| } else if (ioMessage instanceof EndChunkGroupIoTask) { |
| this.writer.setMinPlanIndex(memTable.getMinPlanIndex()); |
| this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex()); |
| this.writer.endChunkGroup(); |
| } else { |
| ((IChunkWriter) ioMessage).writeToFileWriter(this.writer); |
| } |
| } catch (IOException e) { |
| LOGGER.error( |
| "Database {} memtable {}, io task meets error.", storageGroup, memTable, e); |
| return; |
| } |
| long subTaskTime = System.currentTimeMillis() - starTime; |
| ioTime += subTaskTime; |
| WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.IO_TASK, subTaskTime); |
| } |
| LOGGER.debug( |
| "flushing a memtable to file {} in database {}, io cost {}ms", |
| writer.getFile().getName(), |
| storageGroup, |
| ioTime); |
| WRITING_METRICS.recordFlushTsFileSize(storageGroup, writer.getFile().length()); |
| WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_IO, ioTime); |
| }; |
| |
| static class TaskEnd { |
| |
| TaskEnd() {} |
| } |
| |
| static class EndChunkGroupIoTask { |
| |
| EndChunkGroupIoTask() {} |
| } |
| |
| static class StartFlushGroupIOTask { |
| |
| private final IDeviceID deviceId; |
| |
| StartFlushGroupIOTask(IDeviceID deviceId) { |
| this.deviceId = deviceId; |
| } |
| } |
| } |