blob: 758786b5448943ccf7539e70a9944e75110538e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.memtable;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.FlushStatus;
import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public abstract class AbstractMemTable implements IMemTable {
/** each memTable node has a unique int value identifier, init when recovering wal */
public static final AtomicLong memTableIdCounter = new AtomicLong(-1);
private static final Logger logger = LoggerFactory.getLogger(AbstractMemTable.class);
private static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + 2 * Integer.BYTES + 6 * Long.BYTES;
private static final DeviceIDFactory deviceIDFactory = DeviceIDFactory.getInstance();
/** DeviceId -> chunkGroup(MeasurementId -> chunk) */
private final Map<IDeviceID, IWritableMemChunkGroup> memTableMap;
/**
* The initial value is true because we want calculate the text data size when recover memTable!!
*/
protected boolean disableMemControl = true;
private boolean shouldFlush = false;
private volatile FlushStatus flushStatus = FlushStatus.WORKING;
private final int avgSeriesPointNumThreshold =
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
/** memory size of data points, including TEXT values */
private long memSize = 0;
/**
* memory usage of all TVLists memory usage regardless of whether these TVLists are full,
* including TEXT values
*/
private long tvListRamCost = 0;
private int seriesNumber = 0;
private long totalPointsNum = 0;
private long totalPointsNumThreshold = 0;
private long maxPlanIndex = Long.MIN_VALUE;
private long minPlanIndex = Long.MAX_VALUE;
private final long memTableId = memTableIdCounter.incrementAndGet();
private final long createdTime = System.currentTimeMillis();
private static final String METRIC_POINT_IN = "pointsIn";
public AbstractMemTable() {
this.memTableMap = new HashMap<>();
}
public AbstractMemTable(Map<IDeviceID, IWritableMemChunkGroup> memTableMap) {
this.memTableMap = memTableMap;
}
@Override
public Map<IDeviceID, IWritableMemChunkGroup> getMemTableMap() {
return memTableMap;
}
/**
* create this MemChunk if it's not exist
*
* @param deviceId device id
* @param schemaList measurement schemaList
* @return this MemChunkGroup
*/
private IWritableMemChunkGroup createMemChunkGroupIfNotExistAndGet(
IDeviceID deviceId, List<IMeasurementSchema> schemaList) {
IWritableMemChunkGroup memChunkGroup =
memTableMap.computeIfAbsent(deviceId, k -> new WritableMemChunkGroup());
for (IMeasurementSchema schema : schemaList) {
if (schema != null && !memChunkGroup.contains(schema.getMeasurementId())) {
seriesNumber++;
totalPointsNumThreshold += avgSeriesPointNumThreshold;
}
}
return memChunkGroup;
}
private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet(
IDeviceID deviceId, List<IMeasurementSchema> schemaList) {
IWritableMemChunkGroup memChunkGroup =
memTableMap.computeIfAbsent(
deviceId,
k -> {
seriesNumber += schemaList.size();
totalPointsNumThreshold += ((long) avgSeriesPointNumThreshold) * schemaList.size();
return new AlignedWritableMemChunkGroup(
schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList()));
});
for (IMeasurementSchema schema : schemaList) {
if (schema != null && !memChunkGroup.contains(schema.getMeasurementId())) {
seriesNumber++;
totalPointsNumThreshold += avgSeriesPointNumThreshold;
}
}
return memChunkGroup;
}
@Override
public void insert(InsertRowPlan insertRowPlan) {
// if this insert plan isn't from storage engine (mainly from test), we should set a temp device
// id for it
if (insertRowPlan.getDeviceID() == null) {
insertRowPlan.setDeviceID(deviceIDFactory.getDeviceID(insertRowPlan.getDevicePath()));
}
updatePlanIndexes(insertRowPlan.getIndex());
String[] measurements = insertRowPlan.getMeasurements();
Object[] values = insertRowPlan.getValues();
List<IMeasurementSchema> schemaList = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
int nullPointsNumber = 0;
for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
// use measurements[i] to ignore failed partial insert
if (measurements[i] == null) {
schemaList.add(null);
continue;
}
// use values[i] to ignore null value
if (values[i] == null) {
schemaList.add(null);
nullPointsNumber++;
continue;
}
IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
schemaList.add(schema);
dataTypes.add(schema.getType());
}
memSize += MemUtils.getRecordsSize(dataTypes, values, disableMemControl);
write(insertRowPlan.getDeviceID(), schemaList, insertRowPlan.getTime(), values);
int pointsInserted =
insertRowPlan.getMeasurements().length
- insertRowPlan.getFailedMeasurementNumber()
- nullPointsNumber;
totalPointsNum += pointsInserted;
MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
METRIC_POINT_IN);
}
@Override
public void insert(InsertRowNode insertRowNode) {
// if this insert plan isn't from storage engine (mainly from test), we should set a temp device
// id for it
if (insertRowNode.getDeviceID() == null) {
insertRowNode.setDeviceID(
DeviceIDFactory.getInstance().getDeviceID(insertRowNode.getDevicePath()));
}
// updatePlanIndexes(insertRowNode.getIndex());
updatePlanIndexes(0);
String[] measurements = insertRowNode.getMeasurements();
Object[] values = insertRowNode.getValues();
List<IMeasurementSchema> schemaList = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
int nullPointsNumber = 0;
for (int i = 0; i < insertRowNode.getMeasurements().length; i++) {
// use measurements[i] to ignore failed partial insert
if (measurements[i] == null) {
schemaList.add(null);
continue;
}
// use values[i] to ignore null value
if (values[i] == null) {
schemaList.add(null);
nullPointsNumber++;
continue;
}
IMeasurementSchema schema = insertRowNode.getMeasurementSchemas()[i];
schemaList.add(schema);
dataTypes.add(schema.getType());
}
memSize += MemUtils.getRecordsSize(dataTypes, values, disableMemControl);
write(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values);
int pointsInserted =
insertRowNode.getMeasurements().length
- insertRowNode.getFailedMeasurementNumber()
- nullPointsNumber;
totalPointsNum += pointsInserted;
MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
METRIC_POINT_IN);
}
@Override
public void insertAlignedRow(InsertRowPlan insertRowPlan) {
// if this insert plan isn't from storage engine, we should set a temp device id for it
if (insertRowPlan.getDeviceID() == null) {
insertRowPlan.setDeviceID(deviceIDFactory.getDeviceID(insertRowPlan.getDevicePath()));
}
updatePlanIndexes(insertRowPlan.getIndex());
String[] measurements = insertRowPlan.getMeasurements();
Object[] values = insertRowPlan.getValues();
List<IMeasurementSchema> schemaList = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
// use measurements[i] to ignore failed partial insert
if (measurements[i] == null) {
schemaList.add(null);
continue;
}
IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
schemaList.add(schema);
dataTypes.add(schema.getType());
}
if (schemaList.isEmpty()) {
return;
}
memSize += MemUtils.getAlignedRecordsSize(dataTypes, values, disableMemControl);
writeAlignedRow(insertRowPlan.getDeviceID(), schemaList, insertRowPlan.getTime(), values);
int pointsInserted =
insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber();
totalPointsNum += pointsInserted;
MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
METRIC_POINT_IN);
}
@Override
public void insertAlignedRow(InsertRowNode insertRowNode) {
// if this insert node isn't from storage engine, we should set a temp device id for it
if (insertRowNode.getDeviceID() == null) {
insertRowNode.setDeviceID(deviceIDFactory.getDeviceID(insertRowNode.getDevicePath()));
}
// TODO updatePlanIndexes(insertRowNode.getIndex());
updatePlanIndexes(0);
String[] measurements = insertRowNode.getMeasurements();
Object[] values = insertRowNode.getValues();
List<IMeasurementSchema> schemaList = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
for (int i = 0; i < insertRowNode.getMeasurements().length; i++) {
// use measurements[i] to ignore failed partial insert
if (measurements[i] == null) {
schemaList.add(null);
continue;
}
IMeasurementSchema schema = insertRowNode.getMeasurementSchemas()[i];
schemaList.add(schema);
dataTypes.add(schema.getType());
}
if (schemaList.isEmpty()) {
return;
}
memSize += MemUtils.getAlignedRecordsSize(dataTypes, values, disableMemControl);
writeAlignedRow(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values);
int pointsInserted =
insertRowNode.getMeasurements().length - insertRowNode.getFailedMeasurementNumber();
totalPointsNum += pointsInserted;
MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
METRIC_POINT_IN);
}
@Override
public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end)
throws WriteProcessException {
updatePlanIndexes(insertTabletPlan.getIndex());
try {
write(insertTabletPlan, start, end);
memSize += MemUtils.getTabletSize(insertTabletPlan, start, end, disableMemControl);
int pointsInserted =
(insertTabletPlan.getDataTypes().length - insertTabletPlan.getFailedMeasurementNumber())
* (end - start);
totalPointsNum += pointsInserted;
MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
METRIC_POINT_IN);
} catch (RuntimeException e) {
throw new WriteProcessException(e);
}
}
@Override
public void insertAlignedTablet(InsertTabletPlan insertTabletPlan, int start, int end)
throws WriteProcessException {
updatePlanIndexes(insertTabletPlan.getIndex());
try {
writeAlignedTablet(insertTabletPlan, start, end);
memSize += MemUtils.getAlignedTabletSize(insertTabletPlan, start, end, disableMemControl);
int pointsInserted =
(insertTabletPlan.getDataTypes().length - insertTabletPlan.getFailedMeasurementNumber())
* (end - start);
totalPointsNum += pointsInserted;
MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
METRIC_POINT_IN);
} catch (RuntimeException e) {
throw new WriteProcessException(e);
}
}
@Override
public void insertTablet(InsertTabletNode insertTabletNode, int start, int end)
throws WriteProcessException {
// TODO: PlanIndex
// updatePlanIndexes(insertTabletPlan.getIndex());
updatePlanIndexes(0);
try {
write(insertTabletNode, start, end);
memSize += MemUtils.getTabletSize(insertTabletNode, start, end, disableMemControl);
int pointsInserted =
(insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber())
* (end - start);
totalPointsNum += pointsInserted;
MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
METRIC_POINT_IN);
} catch (RuntimeException e) {
throw new WriteProcessException(e);
}
}
@Override
public void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int end)
throws WriteProcessException {
// TODO: PlanIndex
// updatePlanIndexes(insertTabletPlan.getIndex());
updatePlanIndexes(0);
try {
writeAlignedTablet(insertTabletNode, start, end);
memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end, disableMemControl);
int pointsInserted =
(insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber())
* (end - start);
totalPointsNum += pointsInserted;
MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
METRIC_POINT_IN);
} catch (RuntimeException e) {
throw new WriteProcessException(e);
}
}
@Override
public void write(
IDeviceID deviceId,
List<IMeasurementSchema> schemaList,
long insertTime,
Object[] objectValue) {
IWritableMemChunkGroup memChunkGroup =
createMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, schemaList)) {
shouldFlush = true;
}
}
@Override
public void writeAlignedRow(
IDeviceID deviceId,
List<IMeasurementSchema> schemaList,
long insertTime,
Object[] objectValue) {
IWritableMemChunkGroup memChunkGroup =
createAlignedMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, schemaList)) {
shouldFlush = true;
}
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
@Override
public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
// if this insert plan isn't from storage engine, we should set a temp device id for it
if (insertTabletPlan.getDeviceID() == null) {
insertTabletPlan.setDeviceID(deviceIDFactory.getDeviceID(insertTabletPlan.getDevicePath()));
}
List<IMeasurementSchema> schemaList = new ArrayList<>();
for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
if (insertTabletPlan.getColumns()[i] == null) {
schemaList.add(null);
} else {
schemaList.add(insertTabletPlan.getMeasurementMNodes()[i].getSchema());
}
}
IWritableMemChunkGroup memChunkGroup =
createMemChunkGroupIfNotExistAndGet(insertTabletPlan.getDeviceID(), schemaList);
if (memChunkGroup.writeValuesWithFlushCheck(
insertTabletPlan.getTimes(),
insertTabletPlan.getColumns(),
insertTabletPlan.getBitMaps(),
schemaList,
start,
end)) {
shouldFlush = true;
}
}
public void write(InsertTabletNode insertTabletNode, int start, int end) {
// if this insert plan isn't from storage engine, we should set a temp device id for it
if (insertTabletNode.getDeviceID() == null) {
insertTabletNode.setDeviceID(
DeviceIDFactory.getInstance().getDeviceID(insertTabletNode.getDevicePath()));
}
List<IMeasurementSchema> schemaList = new ArrayList<>();
for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) {
if (insertTabletNode.getColumns()[i] == null) {
schemaList.add(null);
} else {
schemaList.add(insertTabletNode.getMeasurementSchemas()[i]);
}
}
IWritableMemChunkGroup memChunkGroup =
createMemChunkGroupIfNotExistAndGet(insertTabletNode.getDeviceID(), schemaList);
if (memChunkGroup.writeValuesWithFlushCheck(
insertTabletNode.getTimes(),
insertTabletNode.getColumns(),
insertTabletNode.getBitMaps(),
schemaList,
start,
end)) {
shouldFlush = true;
}
}
@Override
public void writeAlignedTablet(InsertTabletPlan insertTabletPlan, int start, int end) {
// if this insert plan isn't from storage engine, we should set a temp device id for it
if (insertTabletPlan.getDeviceID() == null) {
insertTabletPlan.setDeviceID(deviceIDFactory.getDeviceID(insertTabletPlan.getDevicePath()));
}
List<IMeasurementSchema> schemaList = new ArrayList<>();
for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
if (insertTabletPlan.getColumns()[i] == null) {
schemaList.add(null);
} else {
schemaList.add(insertTabletPlan.getMeasurementMNodes()[i].getSchema());
}
}
if (schemaList.isEmpty()) {
return;
}
IWritableMemChunkGroup memChunkGroup =
createAlignedMemChunkGroupIfNotExistAndGet(insertTabletPlan.getDeviceID(), schemaList);
if (memChunkGroup.writeValuesWithFlushCheck(
insertTabletPlan.getTimes(),
insertTabletPlan.getColumns(),
insertTabletPlan.getBitMaps(),
schemaList,
start,
end)) {
shouldFlush = true;
}
}
public void writeAlignedTablet(InsertTabletNode insertTabletNode, int start, int end) {
// if this insert plan isn't from storage engine, we should set a temp device id for it
if (insertTabletNode.getDeviceID() == null) {
insertTabletNode.setDeviceID(
DeviceIDFactory.getInstance().getDeviceID(insertTabletNode.getDevicePath()));
}
List<IMeasurementSchema> schemaList = new ArrayList<>();
for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) {
if (insertTabletNode.getColumns()[i] == null) {
schemaList.add(null);
} else {
schemaList.add(insertTabletNode.getMeasurementSchemas()[i]);
}
}
if (schemaList.isEmpty()) {
return;
}
IWritableMemChunkGroup memChunkGroup =
createAlignedMemChunkGroupIfNotExistAndGet(insertTabletNode.getDeviceID(), schemaList);
if (memChunkGroup.writeValuesWithFlushCheck(
insertTabletNode.getTimes(),
insertTabletNode.getColumns(),
insertTabletNode.getBitMaps(),
schemaList,
start,
end)) {
shouldFlush = true;
}
}
@Override
public boolean checkIfChunkDoesNotExist(IDeviceID deviceId, String measurement) {
IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
if (null == memChunkGroup) {
return true;
}
return !memChunkGroup.contains(measurement);
}
@Override
public long getCurrentTVListSize(IDeviceID deviceId, String measurement) {
IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
return memChunkGroup.getCurrentTVListSize(measurement);
}
@Override
public int getSeriesNumber() {
return seriesNumber;
}
@Override
public long getTotalPointsNum() {
return totalPointsNum;
}
@Override
public long size() {
long sum = 0;
for (IWritableMemChunkGroup writableMemChunkGroup : memTableMap.values()) {
sum += writableMemChunkGroup.count();
}
return sum;
}
@Override
public long memSize() {
return memSize;
}
@Override
public boolean reachTotalPointNumThreshold() {
if (totalPointsNum == 0) {
return false;
}
return totalPointsNum >= totalPointsNumThreshold;
}
@Override
public void clear() {
memTableMap.clear();
memSize = 0;
seriesNumber = 0;
totalPointsNum = 0;
totalPointsNumThreshold = 0;
tvListRamCost = 0;
maxPlanIndex = 0;
minPlanIndex = 0;
}
@Override
public boolean isEmpty() {
return memTableMap.isEmpty();
}
@Override
public ReadOnlyMemChunk query(
PartialPath fullPath, long ttlLowerBound, List<Pair<Modification, IMemTable>> modsToMemtable)
throws IOException, QueryProcessException {
return ResourceByPathUtils.getResourceInstance(fullPath)
.getReadOnlyMemChunkFromMemTable(this, modsToMemtable, ttlLowerBound);
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
@Override
public void delete(
PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) {
IWritableMemChunkGroup memChunkGroup = memTableMap.get(getDeviceID(devicePath));
if (memChunkGroup == null) {
return;
}
totalPointsNum -= memChunkGroup.delete(originalPath, devicePath, startTimestamp, endTimestamp);
if (memChunkGroup.getMemChunkMap().isEmpty()) {
memTableMap.remove(getDeviceID(devicePath));
}
}
@Override
public void addTVListRamCost(long cost) {
this.tvListRamCost += cost;
}
@Override
public void releaseTVListRamCost(long cost) {
this.tvListRamCost -= cost;
}
@Override
public long getTVListsRamCost() {
return tvListRamCost;
}
@Override
public void addTextDataSize(long textDataSize) {
this.memSize += textDataSize;
}
@Override
public void releaseTextDataSize(long textDataSize) {
this.memSize -= textDataSize;
}
@Override
public void setShouldFlush() {
shouldFlush = true;
}
@Override
public boolean shouldFlush() {
return shouldFlush;
}
@Override
public void release() {
for (Entry<IDeviceID, IWritableMemChunkGroup> entry : memTableMap.entrySet()) {
entry.getValue().release();
}
}
@Override
public long getMaxPlanIndex() {
return maxPlanIndex;
}
@Override
public long getMinPlanIndex() {
return minPlanIndex;
}
void updatePlanIndexes(long index) {
maxPlanIndex = Math.max(index, maxPlanIndex);
minPlanIndex = Math.min(index, minPlanIndex);
}
@Override
public long getMemTableId() {
return memTableId;
}
@Override
public long getCreatedTime() {
return createdTime;
}
@Override
public FlushStatus getFlushStatus() {
return flushStatus;
}
@Override
public void setFlushStatus(FlushStatus flushStatus) {
this.flushStatus = flushStatus;
}
private IDeviceID getDeviceID(PartialPath deviceId) {
return deviceIDFactory.getDeviceID(deviceId);
}
/** Notice: this method is concurrent unsafe */
@Override
public int serializedSize() {
if (isSignalMemTable()) {
return Byte.BYTES;
}
int size = FIXED_SERIALIZED_SIZE;
for (Map.Entry<IDeviceID, IWritableMemChunkGroup> entry : memTableMap.entrySet()) {
size += ReadWriteIOUtils.sizeToWrite(entry.getKey().toStringID());
size += Byte.BYTES;
size += entry.getValue().serializedSize();
}
return size;
}
/** Notice: this method is concurrent unsafe */
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
WALWriteUtils.write(isSignalMemTable(), buffer);
if (isSignalMemTable()) {
return;
}
buffer.putInt(seriesNumber);
buffer.putLong(memSize);
buffer.putLong(tvListRamCost);
buffer.putLong(totalPointsNum);
buffer.putLong(totalPointsNumThreshold);
buffer.putLong(maxPlanIndex);
buffer.putLong(minPlanIndex);
buffer.putInt(memTableMap.size());
for (Map.Entry<IDeviceID, IWritableMemChunkGroup> entry : memTableMap.entrySet()) {
WALWriteUtils.write(entry.getKey().toStringID(), buffer);
IWritableMemChunkGroup memChunkGroup = entry.getValue();
WALWriteUtils.write(memChunkGroup instanceof AlignedWritableMemChunkGroup, buffer);
memChunkGroup.serializeToWAL(buffer);
}
}
public void deserialize(DataInputStream stream) throws IOException {
seriesNumber = stream.readInt();
memSize = stream.readLong();
tvListRamCost = stream.readLong();
totalPointsNum = stream.readLong();
totalPointsNumThreshold = stream.readLong();
maxPlanIndex = stream.readLong();
minPlanIndex = stream.readLong();
int memTableMapSize = stream.readInt();
for (int i = 0; i < memTableMapSize; ++i) {
IDeviceID deviceID = deviceIDFactory.getDeviceID(ReadWriteIOUtils.readString(stream));
boolean isAligned = ReadWriteIOUtils.readBool(stream);
IWritableMemChunkGroup memChunkGroup;
if (isAligned) {
memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream);
} else {
memChunkGroup = WritableMemChunkGroup.deserialize(stream);
}
memTableMap.put(deviceID, memChunkGroup);
}
}
public static class Factory {
private Factory() {}
public static IMemTable create(DataInputStream stream) throws IOException {
boolean isSignal = ReadWriteIOUtils.readBool(stream);
IMemTable memTable;
if (isSignal) {
memTable = new NotifyFlushMemTable();
} else {
PrimitiveMemTable primitiveMemTable = new PrimitiveMemTable();
primitiveMemTable.deserialize(stream);
memTable = primitiveMemTable;
}
return memTable;
}
}
}