blob: c0acf629346c8e88a4c03ffa6829fc3c53377514 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.rescon;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/** Manage all primitive data list in memory, including get and release operation. */
public class PrimitiveArrayManager {
/** data type -> ArrayDeque of primitive arrays. */
private static final Map<TSDataType, ArrayDeque<Object>> bufferedArraysMap =
new EnumMap<>(TSDataType.class);
/** data type -> current number of buffered arrays */
private static final Map<TSDataType, Integer> bufferedArraysNumMap =
new EnumMap<>(TSDataType.class);
/** data type -> ratio of data type in schema, which could be seen as recommended ratio */
private static final Map<TSDataType, Double> bufferedArraysNumRatio =
new EnumMap<>(TSDataType.class);
private static final Logger logger = LoggerFactory.getLogger(PrimitiveArrayManager.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
public static final int ARRAY_SIZE = config.getPrimitiveArraySize();
/** threshold total size of arrays for all data types */
private static final double BUFFERED_ARRAY_SIZE_THRESHOLD =
config.getAllocateMemoryForWrite() * config.getBufferedArraysMemoryProportion();
/** total size of buffered arrays */
private static AtomicLong bufferedArraysRamSize = new AtomicLong();
/** total size of out of buffer arrays */
private static AtomicLong outOfBufferArraysRamSize = new AtomicLong();
static {
bufferedArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.INT32, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.INT64, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.FLOAT, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.DOUBLE, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.TEXT, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.VECTOR, new ArrayDeque<>());
}
private PrimitiveArrayManager() {
logger.info("BufferedArraySizeThreshold is {}", BUFFERED_ARRAY_SIZE_THRESHOLD);
}
/**
* Get primitive data lists according to type
*
* @param dataType data type
* @return an array
*/
public static Object getPrimitiveArraysByType(TSDataType dataType) {
// check memory of buffered array, if already full, generate OOB
if (bufferedArraysRamSize.get() + ARRAY_SIZE * dataType.getDataTypeSize()
> BUFFERED_ARRAY_SIZE_THRESHOLD) {
// return an out of buffer array
outOfBufferArraysRamSize.addAndGet((long) ARRAY_SIZE * dataType.getDataTypeSize());
return createPrimitiveArray(dataType);
}
synchronized (bufferedArraysMap.get(dataType)) {
// try to get a buffered array
Object dataArray = bufferedArraysMap.get(dataType).poll();
if (dataArray != null) {
return dataArray;
}
// no buffered array, create one
bufferedArraysNumMap.put(dataType, bufferedArraysNumMap.getOrDefault(dataType, 0) + 1);
bufferedArraysRamSize.addAndGet((long) ARRAY_SIZE * dataType.getDataTypeSize());
}
return createPrimitiveArray(dataType);
}
private static Object createPrimitiveArray(TSDataType dataType) {
Object dataArray;
switch (dataType) {
case BOOLEAN:
dataArray = new boolean[ARRAY_SIZE];
break;
case INT32:
dataArray = new int[ARRAY_SIZE];
break;
case INT64:
dataArray = new long[ARRAY_SIZE];
break;
case FLOAT:
dataArray = new float[ARRAY_SIZE];
break;
case DOUBLE:
dataArray = new double[ARRAY_SIZE];
break;
case TEXT:
dataArray = new Binary[ARRAY_SIZE];
break;
case VECTOR:
dataArray = new byte[ARRAY_SIZE][];
break;
default:
throw new UnSupportedDataTypeException(dataType.toString());
}
return dataArray;
}
/**
* Get primitive data lists according to data type and size, only for TVList's sorting
*
* @param dataType data type
* @param size needed capacity
* @return an array of primitive data arrays
*/
public static Object createDataListsByType(TSDataType dataType, int size) {
int arrayNumber = (int) Math.ceil((float) size / (float) ARRAY_SIZE);
switch (dataType) {
case BOOLEAN:
boolean[][] booleans = new boolean[arrayNumber][];
for (int i = 0; i < arrayNumber; i++) {
booleans[i] = new boolean[ARRAY_SIZE];
}
return booleans;
case INT32:
int[][] ints = new int[arrayNumber][];
for (int i = 0; i < arrayNumber; i++) {
ints[i] = new int[ARRAY_SIZE];
}
return ints;
case INT64:
long[][] longs = new long[arrayNumber][];
for (int i = 0; i < arrayNumber; i++) {
longs[i] = new long[ARRAY_SIZE];
}
return longs;
case FLOAT:
float[][] floats = new float[arrayNumber][];
for (int i = 0; i < arrayNumber; i++) {
floats[i] = new float[ARRAY_SIZE];
}
return floats;
case DOUBLE:
double[][] doubles = new double[arrayNumber][];
for (int i = 0; i < arrayNumber; i++) {
doubles[i] = new double[ARRAY_SIZE];
}
return doubles;
case TEXT:
Binary[][] binaries = new Binary[arrayNumber][];
for (int i = 0; i < arrayNumber; i++) {
binaries[i] = new Binary[ARRAY_SIZE];
}
return binaries;
default:
return null;
}
}
/**
* This method is called when bringing back data array
*
* @param dataArray data array
*/
public static void release(Object dataArray) {
TSDataType dataType;
if (dataArray instanceof boolean[]) {
dataType = TSDataType.BOOLEAN;
} else if (dataArray instanceof int[]) {
dataType = TSDataType.INT32;
} else if (dataArray instanceof long[]) {
dataType = TSDataType.INT64;
} else if (dataArray instanceof float[]) {
dataType = TSDataType.FLOAT;
} else if (dataArray instanceof double[]) {
dataType = TSDataType.DOUBLE;
} else if (dataArray instanceof Binary[]) {
Arrays.fill((Binary[]) dataArray, null);
dataType = TSDataType.TEXT;
} else {
throw new UnSupportedDataTypeException("Unknown data array type");
}
// Check out of buffer array num
if (outOfBufferArraysRamSize.get() > 0 && isCurrentDataTypeExceeded(dataType)) {
// release an out of buffer array
bringBackOOBArray(dataType, ARRAY_SIZE);
} else if (outOfBufferArraysRamSize.get() > 0 && !isCurrentDataTypeExceeded(dataType)) {
// if the ratio of buffered arrays of this data type does not exceed the schema ratio,
// choose one replaced array who has larger ratio than schema recommended ratio
TSDataType replacedDataType = null;
for (Map.Entry<TSDataType, Integer> entry : bufferedArraysNumMap.entrySet()) {
if (isCurrentDataTypeExceeded(entry.getKey())) {
replacedDataType = entry.getKey();
// bring back the replaced array as OOB array
bringBackOOBArray(replacedDataType, ARRAY_SIZE);
break;
}
}
if (replacedDataType != null) {
// if we find a replaced array, bring back the original array as a buffered array
if (logger.isDebugEnabled()) {
logger.debug(
"The ratio of {} in buffered array has not reached the schema ratio. Replaced by {}",
dataType,
replacedDataType);
}
bringBackBufferedArray(dataType, dataArray);
} else {
// or else bring back the original array as OOB array
bringBackOOBArray(dataType, ARRAY_SIZE);
}
} else {
// if there is no out of buffer array, bring back as buffered array directly
bringBackBufferedArray(dataType, dataArray);
}
}
/**
* Bring back a buffered array
*
* @param dataType data type
* @param dataArray data array
*/
private static void bringBackBufferedArray(TSDataType dataType, Object dataArray) {
synchronized (bufferedArraysMap.get(dataType)) {
bufferedArraysMap.get(dataType).add(dataArray);
bufferedArraysNumMap.put(dataType, bufferedArraysNumMap.getOrDefault(dataType, 0) + 1);
}
bufferedArraysRamSize.addAndGet((long) -ARRAY_SIZE * dataType.getDataTypeSize());
}
/**
* Bring back out of buffered array
*
* @param dataType data type
* @param size capacity
*/
private static void bringBackOOBArray(TSDataType dataType, int size) {
outOfBufferArraysRamSize.addAndGet((long) -size * dataType.getDataTypeSize());
}
/**
* @param schemaDataTypeNumMap schema DataType Num Map (for each series, increase a long and a
* specific type)
* @param total current DataType Total Num (twice of number of time series)
*/
public static void updateSchemaDataTypeNum(
Map<TSDataType, Integer> schemaDataTypeNumMap, long total) {
for (Map.Entry<TSDataType, Integer> entry : schemaDataTypeNumMap.entrySet()) {
TSDataType dataType = entry.getKey();
bufferedArraysNumRatio.put(dataType, (double) schemaDataTypeNumMap.get(dataType) / total);
}
}
/**
* check whether the ratio of buffered array of specific data type reaches the ratio in schema (as
* recommended ratio)
*
* @param dataType data type
* @return true if the buffered array ratio exceeds the recommend ratio
*/
private static boolean isCurrentDataTypeExceeded(TSDataType dataType) {
int total = 0;
for (int num : bufferedArraysNumMap.values()) {
total += num;
}
return total != 0
&& ((double) bufferedArraysNumMap.getOrDefault(dataType, 0) / total
> bufferedArraysNumRatio.getOrDefault(dataType, 0.0));
}
public static void close() {
for (ArrayDeque<Object> dataListQueue : bufferedArraysMap.values()) {
dataListQueue.clear();
}
bufferedArraysNumMap.clear();
bufferedArraysNumRatio.clear();
bufferedArraysRamSize.set(0);
outOfBufferArraysRamSize.set(0);
}
}