blob: 4060eb97bb280c7f669ab167e89ae9269722b888 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class HashLastFlushTimeMap implements ILastFlushTimeMap {
private static final Logger logger = LoggerFactory.getLogger(HashLastFlushTimeMap.class);
long LONG_SIZE = 24;
long HASHMAP_NODE_BASIC_SIZE = 14 + LONG_SIZE;
/**
* time partition id -> map, which contains device -> largest timestamp of the latest memtable to
* be submitted to asyncTryToFlush partitionLatestFlushedTimeForEachDevice determines whether a
* data point should be put into a sequential file or an unsequential file. Data of some device
* with timestamp less than or equals to the device's latestFlushedTime should go into an
* unsequential file.
*
* <p>It is used to separate sequence and unsequence data.
*/
private final Map<Long, ILastFlushTime> partitionLatestFlushedTime = new ConcurrentHashMap<>();
/**
* global mapping of device -> largest timestamp of the latest memtable to * be submitted to
* asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to maintain global
* latestFlushedTime of devices and will be updated along with
* partitionLatestFlushedTimeForEachDevice
*
* <p>It is used to update last cache.
*/
private final Map<IDeviceID, Long> globalLatestFlushedTimeForEachDevice =
new ConcurrentHashMap<>();
/** record memory cost of map for each partitionId */
private final Map<Long, Long> memCostForEachPartition = new ConcurrentHashMap<>();
// For load
@Override
public void updateOneDeviceFlushedTime(long timePartitionId, IDeviceID deviceId, long time) {
ILastFlushTime flushTimeMapForPartition =
partitionLatestFlushedTime.computeIfAbsent(
timePartitionId, id -> new DeviceLastFlushTime());
long lastFlushTime = flushTimeMapForPartition.getLastFlushTime(deviceId);
if (lastFlushTime == Long.MIN_VALUE) {
long memCost = HASHMAP_NODE_BASIC_SIZE + deviceId.ramBytesUsed();
memCostForEachPartition.compute(
timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost);
}
flushTimeMapForPartition.updateLastFlushTime(deviceId, time);
}
// For recover
@Override
public void updateMultiDeviceFlushedTime(
long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) {
ILastFlushTime flushTimeMapForPartition =
partitionLatestFlushedTime.computeIfAbsent(
timePartitionId, id -> new DeviceLastFlushTime());
long memIncr = 0;
for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) == Long.MIN_VALUE) {
memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
}
flushTimeMapForPartition.updateLastFlushTime(entry.getKey(), entry.getValue());
}
long finalMemIncr = memIncr;
memCostForEachPartition.compute(
timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 + finalMemIncr);
}
@Override
public void updateOneDeviceGlobalFlushedTime(IDeviceID path, long time) {
globalLatestFlushedTimeForEachDevice.compute(
path, (k, v) -> v == null ? time : Math.max(v, time));
}
@Override
public void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> globalFlushedTimeMap) {
for (Map.Entry<IDeviceID, Long> entry : globalFlushedTimeMap.entrySet()) {
globalLatestFlushedTimeForEachDevice.merge(entry.getKey(), entry.getValue(), Math::max);
}
}
@Override
public boolean checkAndCreateFlushedTimePartition(long timePartitionId) {
if (!partitionLatestFlushedTime.containsKey(timePartitionId)) {
partitionLatestFlushedTime.put(timePartitionId, new DeviceLastFlushTime());
return false;
}
return true;
}
// For insert
@Override
public void updateLatestFlushTime(long partitionId, Map<IDeviceID, Long> updateMap) {
for (Map.Entry<IDeviceID, Long> entry : updateMap.entrySet()) {
partitionLatestFlushedTime
.computeIfAbsent(partitionId, id -> new DeviceLastFlushTime())
.updateLastFlushTime(entry.getKey(), entry.getValue());
if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE)
< entry.getValue()) {
globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
}
}
}
@Override
public long getFlushedTime(long timePartitionId, IDeviceID deviceId) {
return partitionLatestFlushedTime.get(timePartitionId).getLastFlushTime(deviceId);
}
@Override
public long getGlobalFlushedTime(IDeviceID path) {
return globalLatestFlushedTimeForEachDevice.getOrDefault(path, Long.MIN_VALUE);
}
@Override
public void clearFlushedTime() {
partitionLatestFlushedTime.clear();
}
@Override
public void clearGlobalFlushedTime() {
globalLatestFlushedTimeForEachDevice.clear();
}
@Override
public void degradeLastFlushTime(long partitionId) {
partitionLatestFlushedTime.computeIfPresent(
partitionId, (id, lastFlushTime) -> lastFlushTime.degradeLastFlushTime());
memCostForEachPartition.put(partitionId, (long) Long.BYTES);
}
@Override
public long getMemSize(long partitionId) {
if (memCostForEachPartition.containsKey(partitionId)) {
return memCostForEachPartition.get(partitionId);
}
return 0;
}
}