blob: 81dbd816ddb3e68647042a59a783a0a2e0d259b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memory;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.schemaengine.metric.SchemaEngineCachedMetric;
import org.apache.iotdb.db.schemaengine.rescon.CachedSchemaEngineStatistics;
import org.apache.iotdb.db.schemaengine.rescon.ISchemaEngineStatistics;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.CachedMTreeStore;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.flush.Scheduler;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.IReleaseFlushStrategy;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategyNumBasedImpl;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.memcontrol.ReleaseFlushStrategySizeBasedImpl;
import org.apache.iotdb.db.utils.concurrent.FiniteSemaphore;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* All instances of CachedMTreeStoreCacheMemoryManager shall be registered in ReleaseFlushMonitor.
* ReleaseFlushMonitor provides the {@link ReleaseFlushMonitor#ensureMemoryStatus} interface, which
* starts asynchronous threads to free and flush the disk when memory usage exceeds a threshold.
*/
public class ReleaseFlushMonitor {
private static final Logger logger = LoggerFactory.getLogger(ReleaseFlushMonitor.class);
/** configuration */
private static final double FREE_FLUSH_PROPORTION = 0.2;
private static final int MONITOR_INETRVAL_MILLISECONDS = 5000;
private static final int MAX_WAITING_TIME_WHEN_RELEASING = 3_000;
/** data structure */
private final Map<Integer, RecordList> regionToTraverserTime = new ConcurrentHashMap<>();
private final Map<Integer, CachedMTreeStore> regionToStoreMap = new ConcurrentHashMap<>();
private final Set<Integer> flushingRegionSet = new CopyOnWriteArraySet<>();
private CachedSchemaEngineStatistics engineStatistics;
private SchemaEngineCachedMetric engineMetric;
private IReleaseFlushStrategy releaseFlushStrategy;
/** thread and lock */
private final Object blockObject = new Object();
private ScheduledExecutorService flushMonitor;
private ExecutorService releaseMonitor;
private FiniteSemaphore releaseSemaphore;
private Scheduler scheduler;
public void registerCachedMTreeStore(CachedMTreeStore store) {
regionToStoreMap.put(store.getRegionStatistics().getSchemaRegionId(), store);
regionToTraverserTime.put(store.getRegionStatistics().getSchemaRegionId(), new RecordList());
}
public void clearCachedMTreeStore(CachedMTreeStore store) {
regionToStoreMap.remove(store.getRegionStatistics().getSchemaRegionId());
regionToTraverserTime.remove(store.getRegionStatistics().getSchemaRegionId());
}
public void init(ISchemaEngineStatistics engineStatistics) {
releaseSemaphore = new FiniteSemaphore(2, 0);
this.engineStatistics = engineStatistics.getAsCachedSchemaEngineStatistics();
if (IoTDBDescriptor.getInstance().getConfig().getCachedMNodeSizeInPBTreeMode() >= 0) {
releaseFlushStrategy = new ReleaseFlushStrategyNumBasedImpl(this.engineStatistics);
} else {
releaseFlushStrategy = new ReleaseFlushStrategySizeBasedImpl(this.engineStatistics);
}
scheduler = new Scheduler(regionToStoreMap, flushingRegionSet, releaseFlushStrategy);
releaseMonitor =
IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.PBTREE_RELEASE_MONITOR.getName());
flushMonitor =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.PBTREE_FLUSH_MONITOR.getName());
releaseMonitor.submit(
() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
releaseSemaphore.acquire();
// 1. first, it will try to release node cache
if (releaseFlushStrategy.isExceedReleaseThreshold()) {
scheduler.scheduleRelease(false);
// 2. if it still exceeds release threshold, it will try to flush node buffer, then
// release node cache again
if (releaseFlushStrategy.isExceedReleaseThreshold()) {
scheduler.scheduleFlushAll();
regionToTraverserTime.values().forEach(RecordList::clear);
}
synchronized (blockObject) {
// invoke the notifyAll() method to wake up the thread waiting for the release
blockObject.notifyAll();
}
}
}
} catch (InterruptedException e) {
logger.info("ReleaseTaskMonitor thread is interrupted.");
Thread.currentThread().interrupt();
}
});
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
flushMonitor,
() -> {
if (releaseFlushStrategy.isExceedReleaseThreshold()) {
releaseSemaphore.release();
} else {
scheduler.scheduleFlush(getRegionsToFlush(System.currentTimeMillis()));
}
},
MONITOR_INETRVAL_MILLISECONDS,
MONITOR_INETRVAL_MILLISECONDS,
TimeUnit.MILLISECONDS);
}
public void setEngineMetric(SchemaEngineCachedMetric engineMetric) {
this.engineMetric = engineMetric;
}
/**
* Check the current memory usage. If the release threshold is exceeded, trigger the task to
* perform an internal and external memory swap to release the memory.
*/
public void ensureMemoryStatus() {
if (releaseFlushStrategy.isExceedReleaseThreshold()) {
releaseSemaphore.release();
}
}
/**
* If there is a ReleaseTask or FlushTask, block the current thread to wait up to
* MAX_WAITING_TIME_WHEN_RELEASING. The thread will be woken up if the ReleaseTask or FlushTask
* ends or the wait time exceeds MAX_WAITING_TIME_WHEN_RELEASING.
*/
public void waitIfReleasing() {
synchronized (blockObject) {
try {
blockObject.wait(MAX_WAITING_TIME_WHEN_RELEASING);
} catch (InterruptedException e) {
logger.warn(
"Interrupt because the release task and flush task did not finish within {} milliseconds.",
MAX_WAITING_TIME_WHEN_RELEASING);
Thread.currentThread().interrupt();
}
}
}
public RecordNode recordTraverserTime(int regionId) {
return regionToTraverserTime.get(regionId).createAndAddToTail();
}
@TestOnly
public void initRecordList(int regionId) {
regionToTraverserTime.computeIfAbsent(regionId, k -> new RecordList());
}
public List<Integer> getRegionsToFlush(long windowsEndTime) {
long windowsStartTime = windowsEndTime - MONITOR_INETRVAL_MILLISECONDS;
List<Pair<Integer, Long>> regionAndFreeTimeList = new ArrayList<>();
for (Map.Entry<Integer, RecordList> entry : regionToTraverserTime.entrySet()) {
int regionId = entry.getKey();
long traverserEndTime = windowsStartTime;
long traverserFreeTime = 0;
RecordList recordList = entry.getValue();
Iterator<RecordNode> iterator = recordList.iterator();
while (iterator.hasNext()) {
RecordNode recordNode = iterator.next();
if (recordNode.startTime > windowsEndTime) {
break;
}
if (recordNode.startTime > traverserEndTime) {
traverserFreeTime += (recordNode.startTime - traverserEndTime);
traverserEndTime = recordNode.endTime;
} else if (recordNode.endTime > traverserEndTime) {
traverserEndTime = recordNode.endTime;
}
if (recordNode.endTime < windowsStartTime) {
iterator.remove();
} else if (recordNode.endTime >= windowsEndTime) {
break;
}
}
if (traverserEndTime < windowsEndTime) {
traverserFreeTime += (windowsEndTime - traverserEndTime);
}
if (traverserFreeTime > FREE_FLUSH_PROPORTION * MONITOR_INETRVAL_MILLISECONDS) {
regionAndFreeTimeList.add(new Pair<>(regionId, traverserFreeTime));
}
}
regionAndFreeTimeList.sort(Comparator.comparing((Pair<Integer, Long> o) -> o.right).reversed());
return regionAndFreeTimeList.stream().map(Pair::getLeft).collect(Collectors.toList());
}
@TestOnly
public void forceFlushAndRelease() {
boolean needFlush;
while (true) {
needFlush = false;
for (CachedMTreeStore store : regionToStoreMap.values()) {
if (store.getMemoryManager().getBufferNodeNum() > 0) {
needFlush = true;
break;
}
}
if (needFlush) {
scheduler.scheduleFlushAll().join();
scheduler.scheduleRelease(true);
} else {
break;
}
}
}
public void clear() {
if (releaseMonitor != null) {
releaseMonitor.shutdownNow();
while (true) {
if (releaseMonitor.isTerminated()) break;
}
releaseMonitor = null;
}
if (flushMonitor != null) {
flushMonitor.shutdownNow();
while (true) {
if (flushMonitor.isTerminated()) break;
}
flushMonitor = null;
}
if (scheduler != null) {
scheduler.clear();
while (true) {
if (scheduler.isTerminated()) break;
}
scheduler = null;
}
regionToStoreMap.clear();
flushingRegionSet.clear();
regionToTraverserTime.clear();
releaseFlushStrategy = null;
engineStatistics = null;
releaseSemaphore = null;
engineMetric = null;
}
public int getActiveWorkerNum() {
return scheduler.getActiveWorkerNum();
}
private ReleaseFlushMonitor() {}
private static class ReleaseFlushMonitorHolder {
private static final ReleaseFlushMonitor INSTANCE = new ReleaseFlushMonitor();
private ReleaseFlushMonitorHolder() {}
}
public static ReleaseFlushMonitor getInstance() {
return ReleaseFlushMonitor.ReleaseFlushMonitorHolder.INSTANCE;
}
@NotThreadSafe
private static class RecordList {
// The start time of RecordNode is incremental from head to tail
private final RecordNode head = new RecordNode();
private final RecordNode tail = new RecordNode();
private RecordList() {
head.next = tail;
tail.prev = head;
}
private synchronized RecordNode createAndAddToTail() {
RecordNode recordNode = new RecordNode();
recordNode.prev = tail.prev;
recordNode.next = tail;
tail.prev.next = recordNode;
tail.prev = recordNode;
return recordNode;
}
private synchronized void remove(RecordNode recordNode) {
recordNode.prev.next = recordNode.next;
recordNode.next.prev = recordNode.prev;
recordNode.prev = null;
recordNode.next = null;
}
private synchronized void clear() {
head.next = tail;
tail.prev = head;
}
private Iterator<RecordNode> iterator() {
return new Iterator<RecordNode>() {
private RecordNode next = null;
private RecordNode cur = head;
@Override
public boolean hasNext() {
if (next == null && cur.next != tail) {
next = cur.next;
}
return next != null;
}
@Override
public RecordNode next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
cur = next;
next = null;
return cur;
}
@Override
public void remove() {
if (next == null && cur.next != tail) {
next = cur.next;
}
RecordList.this.remove(cur);
}
};
}
}
public static class RecordNode {
private RecordNode prev = null;
private RecordNode next = null;
private Long startTime = System.currentTimeMillis();
private Long endTime = Long.MAX_VALUE;
@TestOnly
public void setStartTime(Long startTime) {
this.startTime = startTime;
}
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
}
}