blob: aea79ce447532c3992c1a723c58956500254a6d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.conf;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionPriority;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.constant.CrossCompactionSelector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.constant.InnerSequenceCompactionSelector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.constant.InnerUnsequenceCompactionSelector;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm;
import org.apache.iotdb.external.api.IPropertiesLoader;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.config.ReloadLevel;
import org.apache.iotdb.metrics.metricsets.system.SystemMetrics;
import org.apache.iotdb.metrics.reporter.iotdb.IoTDBInternalMemoryReporter;
import org.apache.iotdb.metrics.reporter.iotdb.IoTDBInternalReporter;
import org.apache.iotdb.metrics.utils.InternalReporterType;
import org.apache.iotdb.metrics.utils.NodeType;
import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.fileSystem.FSType;
import org.apache.tsfile.utils.FilePathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileStore;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
public class IoTDBDescriptor {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDescriptor.class);
private final CommonDescriptor commonDescriptor = CommonDescriptor.getInstance();
private final IoTDBConfig conf = new IoTDBConfig();
private static final long MAX_THROTTLE_THRESHOLD = 600 * 1024 * 1024 * 1024L;
private static final long MIN_THROTTLE_THRESHOLD = 50 * 1024 * 1024 * 1024L;
private static final double MAX_DIR_USE_PROPORTION = 0.8;
private static final double MIN_DIR_USE_PROPORTION = 0.5;
protected IoTDBDescriptor() {
loadProps();
ServiceLoader<IPropertiesLoader> propertiesLoaderServiceLoader =
ServiceLoader.load(IPropertiesLoader.class);
for (IPropertiesLoader loader : propertiesLoaderServiceLoader) {
LOGGER.info("Will reload properties from {} ", loader.getClass().getName());
Properties properties = loader.loadProperties();
try {
loadProperties(properties);
} catch (Exception e) {
LOGGER.error(
"Failed to reload properties from {}, reject DataNode startup.",
loader.getClass().getName(),
e);
System.exit(-1);
}
conf.setCustomizedProperties(loader.getCustomizedProperties());
TSFileDescriptor.getInstance().overwriteConfigByCustomSettings(properties);
TSFileDescriptor.getInstance()
.getConfig()
.setCustomizedProperties(loader.getCustomizedProperties());
}
}
public static IoTDBDescriptor getInstance() {
return IoTDBDescriptorHolder.INSTANCE;
}
public IoTDBConfig getConfig() {
return conf;
}
/**
* get props url location
*
* @return url object if location exit, otherwise null.
*/
public URL getPropsUrl(String configFileName) {
String urlString = commonDescriptor.getConfDir();
if (urlString == null) {
// If urlString wasn't provided, try to find a default config in the root of the classpath.
URL uri = IoTDBConfig.class.getResource("/" + configFileName);
if (uri != null) {
return uri;
}
LOGGER.warn(
"Cannot find IOTDB_HOME or IOTDB_CONF environment variable when loading "
+ "config file {}, use default configuration",
configFileName);
// update all data seriesPath
conf.updatePath();
return null;
}
// If a config location was provided, but it doesn't end with a properties file,
// append the default location.
else if (!urlString.endsWith(".properties")) {
urlString += (File.separatorChar + configFileName);
}
// If the url doesn't start with "file:" or "classpath:", it's provided as a no path.
// So we need to add it to make it a real URL.
if (!urlString.startsWith("file:") && !urlString.startsWith("classpath:")) {
urlString = "file:" + urlString;
}
try {
return new URL(urlString);
} catch (MalformedURLException e) {
return null;
}
}
/** load an property file and set TsfileDBConfig variables. */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private void loadProps() {
URL url = getPropsUrl(CommonConfig.CONFIG_NAME);
Properties commonProperties = new Properties();
if (url != null) {
try (InputStream inputStream = url.openStream()) {
LOGGER.info("Start to read config file {}", url);
commonProperties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
} catch (FileNotFoundException e) {
LOGGER.error("Fail to find config file {}, reject DataNode startup.", url, e);
System.exit(-1);
} catch (IOException e) {
LOGGER.error("Cannot load config file, reject DataNode startup.", e);
System.exit(-1);
} catch (Exception e) {
LOGGER.error("Incorrect format in config file, reject DataNode startup.", e);
System.exit(-1);
}
} else {
LOGGER.warn(
"Couldn't load the configuration {} from any of the known sources.",
CommonConfig.CONFIG_NAME);
}
url = getPropsUrl(IoTDBConfig.CONFIG_NAME);
if (url != null) {
try (InputStream inputStream = url.openStream()) {
LOGGER.info("Start to read config file {}", url);
Properties properties = new Properties();
properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
commonProperties.putAll(properties);
loadProperties(commonProperties);
} catch (FileNotFoundException e) {
LOGGER.error("Fail to find config file {}, reject DataNode startup.", url, e);
System.exit(-1);
} catch (IOException e) {
LOGGER.error("Cannot load config file, reject DataNode startup.", e);
System.exit(-1);
} catch (Exception e) {
LOGGER.error("Incorrect format in config file, reject DataNode startup.", e);
System.exit(-1);
} finally {
// update all data seriesPath
conf.updatePath();
commonDescriptor.getConfig().updatePath(System.getProperty(IoTDBConstant.IOTDB_HOME, null));
MetricConfigDescriptor.getInstance().loadProps(commonProperties);
MetricConfigDescriptor.getInstance()
.getMetricConfig()
.updateRpcInstance(
conf.getClusterName(), NodeType.DATANODE, SchemaConstant.SYSTEM_DATABASE);
}
} else {
LOGGER.warn(
"Couldn't load the configuration {} from any of the known sources.",
IoTDBConfig.CONFIG_NAME);
}
}
public void loadProperties(Properties properties) throws BadNodeUrlException, IOException {
conf.setClusterName(
properties.getProperty(IoTDBConstant.CLUSTER_NAME, conf.getClusterName()).trim());
conf.setRpcAddress(
properties.getProperty(IoTDBConstant.DN_RPC_ADDRESS, conf.getRpcAddress()).trim());
conf.setRpcThriftCompressionEnable(
Boolean.parseBoolean(
properties
.getProperty(
"dn_rpc_thrift_compression_enable",
Boolean.toString(conf.isRpcThriftCompressionEnable()))
.trim()));
conf.setRpcAdvancedCompressionEnable(
Boolean.parseBoolean(
properties
.getProperty(
"dn_rpc_advanced_compression_enable",
Boolean.toString(conf.isRpcAdvancedCompressionEnable()))
.trim()));
conf.setConnectionTimeoutInMS(
Integer.parseInt(
properties
.getProperty(
"dn_connection_timeout_ms", String.valueOf(conf.getConnectionTimeoutInMS()))
.trim()));
if (properties.getProperty("dn_max_connection_for_internal_service", null) != null) {
conf.setMaxClientNumForEachNode(
Integer.parseInt(
properties.getProperty("dn_max_connection_for_internal_service").trim()));
LOGGER.warn(
"The parameter dn_max_connection_for_internal_service is out of date. Please rename it to dn_max_client_count_for_each_node_in_client_manager.");
}
conf.setMaxClientNumForEachNode(
Integer.parseInt(
properties
.getProperty(
"dn_max_client_count_for_each_node_in_client_manager",
String.valueOf(conf.getMaxClientNumForEachNode()))
.trim()));
conf.setSelectorNumOfClientManager(
Integer.parseInt(
properties
.getProperty(
"dn_selector_thread_count_of_client_manager",
String.valueOf(conf.getSelectorNumOfClientManager()))
.trim()));
conf.setRpcPort(
Integer.parseInt(
properties
.getProperty(IoTDBConstant.DN_RPC_PORT, Integer.toString(conf.getRpcPort()))
.trim()));
conf.setBufferedArraysMemoryProportion(
Double.parseDouble(
properties
.getProperty(
"buffered_arrays_memory_proportion",
Double.toString(conf.getBufferedArraysMemoryProportion()))
.trim()));
conf.setFlushProportion(
Double.parseDouble(
properties
.getProperty("flush_proportion", Double.toString(conf.getFlushProportion()))
.trim()));
final double rejectProportion =
Double.parseDouble(
properties
.getProperty("reject_proportion", Double.toString(conf.getRejectProportion()))
.trim());
final double devicePathCacheProportion =
Double.parseDouble(
properties
.getProperty(
"device_path_cache_proportion",
Double.toString(conf.getDevicePathCacheProportion()))
.trim());
if (rejectProportion + devicePathCacheProportion >= 1) {
LOGGER.warn(
"The sum of write_memory_proportion and device_path_cache_proportion is too large, use default values 0.8 and 0.05.");
} else {
conf.setRejectProportion(rejectProportion);
conf.setDevicePathCacheProportion(devicePathCacheProportion);
}
conf.setWriteMemoryVariationReportProportion(
Double.parseDouble(
properties
.getProperty(
"write_memory_variation_report_proportion",
Double.toString(conf.getWriteMemoryVariationReportProportion()))
.trim()));
conf.setMetaDataCacheEnable(
Boolean.parseBoolean(
properties
.getProperty(
"meta_data_cache_enable", Boolean.toString(conf.isMetaDataCacheEnable()))
.trim()));
initMemoryAllocate(properties);
String systemDir = properties.getProperty("dn_system_dir");
if (systemDir == null) {
systemDir = properties.getProperty("base_dir");
if (systemDir != null) {
systemDir = FilePathUtils.regularizePath(systemDir) + IoTDBConstant.SYSTEM_FOLDER_NAME;
} else {
systemDir = conf.getSystemDir();
}
}
conf.setSystemDir(systemDir);
conf.setSchemaDir(
FilePathUtils.regularizePath(conf.getSystemDir()) + IoTDBConstant.SCHEMA_FOLDER_NAME);
conf.setQueryDir(
FilePathUtils.regularizePath(conf.getSystemDir() + IoTDBConstant.QUERY_FOLDER_NAME));
String[] defaultTierDirs = new String[conf.getTierDataDirs().length];
for (int i = 0; i < defaultTierDirs.length; ++i) {
defaultTierDirs[i] = String.join(",", conf.getTierDataDirs()[i]);
}
conf.setTierDataDirs(
parseDataDirs(
properties.getProperty(
"dn_data_dirs", String.join(IoTDBConstant.TIER_SEPARATOR, defaultTierDirs))));
conf.setConsensusDir(properties.getProperty("dn_consensus_dir", conf.getConsensusDir()));
int mlogBufferSize =
Integer.parseInt(
properties.getProperty("mlog_buffer_size", Integer.toString(conf.getMlogBufferSize())));
if (mlogBufferSize > 0) {
conf.setMlogBufferSize(mlogBufferSize);
}
long forceMlogPeriodInMs =
Long.parseLong(
properties.getProperty(
"sync_mlog_period_in_ms", Long.toString(conf.getSyncMlogPeriodInMs())));
if (forceMlogPeriodInMs > 0) {
conf.setSyncMlogPeriodInMs(forceMlogPeriodInMs);
}
String oldMultiDirStrategyClassName = conf.getMultiDirStrategyClassName();
conf.setMultiDirStrategyClassName(
properties.getProperty("dn_multi_dir_strategy", conf.getMultiDirStrategyClassName()));
try {
conf.checkMultiDirStrategyClassName();
} catch (Exception e) {
conf.setMultiDirStrategyClassName(oldMultiDirStrategyClassName);
throw e;
}
conf.setBatchSize(
Integer.parseInt(
properties.getProperty("batch_size", Integer.toString(conf.getBatchSize()))));
conf.setTvListSortAlgorithm(
TVListSortAlgorithm.valueOf(
properties.getProperty(
"tvlist_sort_algorithm", conf.getTvListSortAlgorithm().toString())));
conf.setAvgSeriesPointNumberThreshold(
Integer.parseInt(
properties.getProperty(
"avg_series_point_number_threshold",
Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
conf.setCheckPeriodWhenInsertBlocked(
Integer.parseInt(
properties.getProperty(
"check_period_when_insert_blocked",
Integer.toString(conf.getCheckPeriodWhenInsertBlocked()))));
conf.setMaxWaitingTimeWhenInsertBlocked(
Integer.parseInt(
properties.getProperty(
"max_waiting_time_when_insert_blocked",
Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked()))));
conf.setIoTaskQueueSizeForFlushing(
Integer.parseInt(
properties.getProperty(
"io_task_queue_size_for_flushing",
Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
conf.setCompactionScheduleIntervalInMs(
Long.parseLong(
properties.getProperty(
"compaction_schedule_interval_in_ms",
Long.toString(conf.getCompactionScheduleIntervalInMs()))));
conf.setCompactionSubmissionIntervalInMs(
Long.parseLong(
properties.getProperty(
"compaction_submission_interval_in_ms",
Long.toString(conf.getCompactionSubmissionIntervalInMs()))));
conf.setEnableCrossSpaceCompaction(
Boolean.parseBoolean(
properties.getProperty(
"enable_cross_space_compaction",
Boolean.toString(conf.isEnableCrossSpaceCompaction()))));
conf.setEnableSeqSpaceCompaction(
Boolean.parseBoolean(
properties.getProperty(
"enable_seq_space_compaction",
Boolean.toString(conf.isEnableSeqSpaceCompaction()))));
conf.setEnableUnseqSpaceCompaction(
Boolean.parseBoolean(
properties.getProperty(
"enable_unseq_space_compaction",
Boolean.toString(conf.isEnableUnseqSpaceCompaction()))));
conf.setCrossCompactionSelector(
CrossCompactionSelector.getCrossCompactionSelector(
properties.getProperty(
"cross_selector", conf.getCrossCompactionSelector().toString())));
conf.setInnerSequenceCompactionSelector(
InnerSequenceCompactionSelector.getInnerSequenceCompactionSelector(
properties.getProperty(
"inner_seq_selector", conf.getInnerSequenceCompactionSelector().toString())));
conf.setInnerUnsequenceCompactionSelector(
InnerUnsequenceCompactionSelector.getInnerUnsequenceCompactionSelector(
properties.getProperty(
"inner_unseq_selector", conf.getInnerUnsequenceCompactionSelector().toString())));
conf.setInnerSeqCompactionPerformer(
InnerSeqCompactionPerformer.getInnerSeqCompactionPerformer(
properties.getProperty(
"inner_seq_performer", conf.getInnerSeqCompactionPerformer().toString())));
conf.setInnerUnseqCompactionPerformer(
InnerUnseqCompactionPerformer.getInnerUnseqCompactionPerformer(
properties.getProperty(
"inner_unseq_performer", conf.getInnerUnseqCompactionPerformer().toString())));
conf.setCrossCompactionPerformer(
CrossCompactionPerformer.getCrossCompactionPerformer(
properties.getProperty(
"cross_performer", conf.getCrossCompactionPerformer().toString())));
conf.setCompactionPriority(
CompactionPriority.valueOf(
properties.getProperty(
"compaction_priority", conf.getCompactionPriority().toString())));
int subtaskNum =
Integer.parseInt(
properties.getProperty(
"sub_compaction_thread_count", Integer.toString(conf.getSubCompactionTaskNum())));
subtaskNum = subtaskNum <= 0 ? 1 : subtaskNum;
conf.setSubCompactionTaskNum(subtaskNum);
int compactionScheduleThreadNum =
Integer.parseInt(
properties.getProperty(
"compaction_schedule_thread_num",
Integer.toString(conf.getCompactionScheduleThreadNum())));
compactionScheduleThreadNum =
compactionScheduleThreadNum <= 0 ? 1 : compactionScheduleThreadNum;
conf.setCompactionScheduleThreadNum(compactionScheduleThreadNum);
conf.setQueryTimeoutThreshold(
Long.parseLong(
properties.getProperty(
"query_timeout_threshold", Long.toString(conf.getQueryTimeoutThreshold()))));
conf.setSessionTimeoutThreshold(
Integer.parseInt(
properties.getProperty(
"dn_session_timeout_threshold",
Integer.toString(conf.getSessionTimeoutThreshold()))));
conf.setFlushThreadCount(
Integer.parseInt(
properties.getProperty(
"flush_thread_count", Integer.toString(conf.getFlushThreadCount()))));
if (conf.getFlushThreadCount() <= 0) {
conf.setFlushThreadCount(Runtime.getRuntime().availableProcessors());
}
// start: index parameter setting
conf.setIndexRootFolder(properties.getProperty("index_root_dir", conf.getIndexRootFolder()));
conf.setEnableIndex(
Boolean.parseBoolean(
properties.getProperty("enable_index", Boolean.toString(conf.isEnableIndex()))));
conf.setConcurrentIndexBuildThread(
Integer.parseInt(
properties.getProperty(
"concurrent_index_build_thread",
Integer.toString(conf.getConcurrentIndexBuildThread()))));
if (conf.getConcurrentIndexBuildThread() <= 0) {
conf.setConcurrentIndexBuildThread(Runtime.getRuntime().availableProcessors());
}
conf.setDefaultIndexWindowRange(
Integer.parseInt(
properties.getProperty(
"default_index_window_range",
Integer.toString(conf.getDefaultIndexWindowRange()))));
conf.setQueryThreadCount(
Integer.parseInt(
properties.getProperty(
"query_thread_count", Integer.toString(conf.getQueryThreadCount()))));
if (conf.getQueryThreadCount() <= 0) {
conf.setQueryThreadCount(Runtime.getRuntime().availableProcessors());
}
conf.setDegreeOfParallelism(
Integer.parseInt(
properties.getProperty(
"degree_of_query_parallelism", Integer.toString(conf.getDegreeOfParallelism()))));
if (conf.getDegreeOfParallelism() <= 0) {
conf.setDegreeOfParallelism(Runtime.getRuntime().availableProcessors() / 2);
}
conf.setMergeThresholdOfExplainAnalyze(
Integer.parseInt(
properties.getProperty(
"merge_threshold_of_explain_analyze",
Integer.toString(conf.getMergeThresholdOfExplainAnalyze()))));
conf.setModeMapSizeThreshold(
Integer.parseInt(
properties.getProperty(
"mode_map_size_threshold", Integer.toString(conf.getModeMapSizeThreshold()))));
if (conf.getModeMapSizeThreshold() <= 0) {
conf.setModeMapSizeThreshold(10000);
}
conf.setMaxAllowedConcurrentQueries(
Integer.parseInt(
properties.getProperty(
"max_allowed_concurrent_queries",
Integer.toString(conf.getMaxAllowedConcurrentQueries()))));
if (conf.getMaxAllowedConcurrentQueries() <= 0) {
conf.setMaxAllowedConcurrentQueries(1000);
}
conf.setmRemoteSchemaCacheSize(
Integer.parseInt(
properties
.getProperty(
"remote_schema_cache_size", Integer.toString(conf.getmRemoteSchemaCacheSize()))
.trim()));
conf.setLanguageVersion(
properties.getProperty("language_version", conf.getLanguageVersion()).trim());
if (properties.containsKey("chunk_buffer_pool_enable")) {
conf.setChunkBufferPoolEnable(
Boolean.parseBoolean(properties.getProperty("chunk_buffer_pool_enable")));
}
conf.setCrossCompactionFileSelectionTimeBudget(
Long.parseLong(
properties.getProperty(
"cross_compaction_file_selection_time_budget",
Long.toString(conf.getCrossCompactionFileSelectionTimeBudget()))));
conf.setMergeIntervalSec(
Long.parseLong(
properties.getProperty(
"merge_interval_sec", Long.toString(conf.getMergeIntervalSec()))));
conf.setCompactionThreadCount(
Integer.parseInt(
properties.getProperty(
"compaction_thread_count", Integer.toString(conf.getCompactionThreadCount()))));
conf.setChunkMetadataSizeProportion(
Double.parseDouble(
properties.getProperty(
"chunk_metadata_size_proportion",
Double.toString(conf.getChunkMetadataSizeProportion()))));
conf.setTargetCompactionFileSize(
Long.parseLong(
properties.getProperty(
"target_compaction_file_size", Long.toString(conf.getTargetCompactionFileSize()))));
conf.setTargetChunkSize(
Long.parseLong(
properties.getProperty("target_chunk_size", Long.toString(conf.getTargetChunkSize()))));
conf.setTargetChunkPointNum(
Long.parseLong(
properties.getProperty(
"target_chunk_point_num", Long.toString(conf.getTargetChunkPointNum()))));
conf.setChunkPointNumLowerBoundInCompaction(
Long.parseLong(
properties.getProperty(
"chunk_point_num_lower_bound_in_compaction",
Long.toString(conf.getChunkPointNumLowerBoundInCompaction()))));
conf.setChunkSizeLowerBoundInCompaction(
Long.parseLong(
properties.getProperty(
"chunk_size_lower_bound_in_compaction",
Long.toString(conf.getChunkSizeLowerBoundInCompaction()))));
conf.setFileLimitPerInnerTask(
Integer.parseInt(
properties.getProperty(
"max_inner_compaction_candidate_file_num",
Integer.toString(conf.getFileLimitPerInnerTask()))));
conf.setFileLimitPerCrossTask(
Integer.parseInt(
properties.getProperty(
"max_cross_compaction_candidate_file_num",
Integer.toString(conf.getFileLimitPerCrossTask()))));
conf.setMaxCrossCompactionCandidateFileSize(
Long.parseLong(
properties.getProperty(
"max_cross_compaction_candidate_file_size",
Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
conf.setMinCrossCompactionUnseqFileLevel(
Integer.parseInt(
properties.getProperty(
"min_cross_compaction_unseq_file_level",
Integer.toString(conf.getMinCrossCompactionUnseqFileLevel()))));
conf.setCompactionWriteThroughputMbPerSec(
Integer.parseInt(
properties.getProperty(
"compaction_write_throughput_mb_per_sec",
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
conf.setCompactionReadThroughputMbPerSec(
Integer.parseInt(
properties.getProperty(
"compaction_read_throughput_mb_per_sec",
Integer.toString(conf.getCompactionReadThroughputMbPerSec()))));
conf.setCompactionReadOperationPerSec(
Integer.parseInt(
properties.getProperty(
"compaction_read_operation_per_sec",
Integer.toString(conf.getCompactionReadOperationPerSec()))));
conf.setEnableTsFileValidation(
Boolean.parseBoolean(
properties.getProperty(
"enable_tsfile_validation", String.valueOf(conf.isEnableTsFileValidation()))));
conf.setCandidateCompactionTaskQueueSize(
Integer.parseInt(
properties.getProperty(
"candidate_compaction_task_queue_size",
Integer.toString(conf.getCandidateCompactionTaskQueueSize()))));
conf.setInnerCompactionTaskSelectionDiskRedundancy(
Double.parseDouble(
properties.getProperty(
"inner_compaction_task_selection_disk_redundancy",
Double.toString(conf.getInnerCompactionTaskSelectionDiskRedundancy()))));
conf.setInnerCompactionTaskSelectionModsFileThreshold(
Long.parseLong(
properties.getProperty(
"inner_compaction_task_selection_mods_file_threshold",
Long.toString(conf.getInnerCompactionTaskSelectionModsFileThreshold()))));
conf.setEnablePartialInsert(
Boolean.parseBoolean(
properties.getProperty(
"enable_partial_insert", String.valueOf(conf.isEnablePartialInsert()))));
conf.setEnable13DataInsertAdapt(
Boolean.parseBoolean(
properties.getProperty(
"0.13_data_insert_adapt", String.valueOf(conf.isEnable13DataInsertAdapt()))));
int rpcSelectorThreadNum =
Integer.parseInt(
properties.getProperty(
"dn_rpc_selector_thread_count",
Integer.toString(conf.getRpcSelectorThreadCount()).trim()));
if (rpcSelectorThreadNum <= 0) {
rpcSelectorThreadNum = 1;
}
conf.setRpcSelectorThreadCount(rpcSelectorThreadNum);
int minConcurrentClientNum =
Integer.parseInt(
properties.getProperty(
"dn_rpc_min_concurrent_client_num",
Integer.toString(conf.getRpcMinConcurrentClientNum()).trim()));
if (minConcurrentClientNum <= 0) {
minConcurrentClientNum = Runtime.getRuntime().availableProcessors();
}
conf.setRpcMinConcurrentClientNum(minConcurrentClientNum);
int maxConcurrentClientNum =
Integer.parseInt(
properties.getProperty(
"dn_rpc_max_concurrent_client_num",
Integer.toString(conf.getRpcMaxConcurrentClientNum()).trim()));
if (maxConcurrentClientNum <= 0) {
maxConcurrentClientNum = 65535;
}
conf.setRpcMaxConcurrentClientNum(maxConcurrentClientNum);
loadAutoCreateSchemaProps(properties);
conf.setTsFileStorageFs(
properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().toString()));
conf.setEnableHDFS(
Boolean.parseBoolean(
properties.getProperty("enable_hdfs", String.valueOf(conf.isEnableHDFS()))));
conf.setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath()));
conf.setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
conf.setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(","));
conf.setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort()));
conf.setDfsNameServices(properties.getProperty("dfs_nameservices", conf.getDfsNameServices()));
conf.setDfsHaNamenodes(
properties.getProperty("dfs_ha_namenodes", conf.getRawDfsHaNamenodes()).split(","));
conf.setDfsHaAutomaticFailoverEnabled(
Boolean.parseBoolean(
properties.getProperty(
"dfs_ha_automatic_failover_enabled",
String.valueOf(conf.isDfsHaAutomaticFailoverEnabled()))));
conf.setDfsClientFailoverProxyProvider(
properties.getProperty(
"dfs_client_failover_proxy_provider", conf.getDfsClientFailoverProxyProvider()));
conf.setUseKerberos(
Boolean.parseBoolean(
properties.getProperty("hdfs_use_kerberos", String.valueOf(conf.isUseKerberos()))));
conf.setKerberosKeytabFilePath(
properties.getProperty("kerberos_keytab_file_path", conf.getKerberosKeytabFilePath()));
conf.setKerberosPrincipal(
properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
// The default fill interval in LinearFill and PreviousFill
conf.setDefaultFillInterval(
Integer.parseInt(
properties.getProperty(
"default_fill_interval", String.valueOf(conf.getDefaultFillInterval()))));
conf.setTagAttributeFlushInterval(
Integer.parseInt(
properties.getProperty(
"tag_attribute_flush_interval",
String.valueOf(conf.getTagAttributeFlushInterval()))));
conf.setPrimitiveArraySize(
(Integer.parseInt(
properties.getProperty(
"primitive_array_size", String.valueOf(conf.getPrimitiveArraySize())))));
conf.setThriftMaxFrameSize(
Integer.parseInt(
properties.getProperty(
"dn_thrift_max_frame_size", String.valueOf(conf.getThriftMaxFrameSize()))));
if (conf.getThriftMaxFrameSize() < IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2) {
conf.setThriftMaxFrameSize(IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2);
}
conf.setThriftDefaultBufferSize(
Integer.parseInt(
properties.getProperty(
"dn_thrift_init_buffer_size", String.valueOf(conf.getThriftDefaultBufferSize()))));
conf.setSlowQueryThreshold(
Long.parseLong(
properties.getProperty(
"slow_query_threshold", String.valueOf(conf.getSlowQueryThreshold()))));
conf.setDataRegionNum(
Integer.parseInt(
properties.getProperty("data_region_num", String.valueOf(conf.getDataRegionNum()))));
conf.setRecoveryLogIntervalInMs(
Long.parseLong(
properties.getProperty(
"recovery_log_interval_in_ms", String.valueOf(conf.getRecoveryLogIntervalInMs()))));
conf.setEnableSeparateData(
Boolean.parseBoolean(
properties.getProperty(
"enable_separate_data", Boolean.toString(conf.isEnableSeparateData()))));
conf.setWindowEvaluationThreadCount(
Integer.parseInt(
properties.getProperty(
"window_evaluation_thread_count",
Integer.toString(conf.getWindowEvaluationThreadCount()))));
if (conf.getWindowEvaluationThreadCount() <= 0) {
conf.setWindowEvaluationThreadCount(Runtime.getRuntime().availableProcessors());
}
conf.setMaxPendingWindowEvaluationTasks(
Integer.parseInt(
properties.getProperty(
"max_pending_window_evaluation_tasks",
Integer.toString(conf.getMaxPendingWindowEvaluationTasks()))));
if (conf.getMaxPendingWindowEvaluationTasks() <= 0) {
conf.setMaxPendingWindowEvaluationTasks(64);
}
conf.setCachedMNodeSizeInPBTreeMode(
Integer.parseInt(
properties.getProperty(
"cached_mnode_size_in_pbtree_mode",
String.valueOf(conf.getCachedMNodeSizeInPBTreeMode()))));
conf.setMinimumSegmentInPBTree(
Short.parseShort(
properties.getProperty(
"minimum_pbtree_segment_in_bytes",
String.valueOf(conf.getMinimumSegmentInPBTree()))));
conf.setPageCacheSizeInPBTree(
Integer.parseInt(
properties.getProperty(
"page_cache_in_pbtree", String.valueOf(conf.getPageCacheSizeInPBTree()))));
conf.setPBTreeLogSize(
Integer.parseInt(
properties.getProperty("pbtree_log_size", String.valueOf(conf.getPBTreeLogSize()))));
conf.setMaxMeasurementNumOfInternalRequest(
Integer.parseInt(
properties.getProperty(
"max_measurement_num_of_internal_request",
String.valueOf(conf.getMaxMeasurementNumOfInternalRequest()))));
// mqtt
loadMqttProps(properties);
conf.setIntoOperationBufferSizeInByte(
Long.parseLong(
properties.getProperty(
"into_operation_buffer_size_in_byte",
String.valueOf(conf.getIntoOperationBufferSizeInByte()))));
conf.setSelectIntoInsertTabletPlanRowLimit(
Integer.parseInt(
properties.getProperty(
"select_into_insert_tablet_plan_row_limit",
String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
conf.setIntoOperationExecutionThreadCount(
Integer.parseInt(
properties.getProperty(
"into_operation_execution_thread_count",
String.valueOf(conf.getIntoOperationExecutionThreadCount()))));
if (conf.getIntoOperationExecutionThreadCount() <= 0) {
conf.setIntoOperationExecutionThreadCount(2);
}
conf.setMaxAllocateMemoryRatioForLoad(
Double.parseDouble(
properties.getProperty(
"max_allocate_memory_ratio_for_load",
String.valueOf(conf.getMaxAllocateMemoryRatioForLoad()))));
conf.setLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber(
Integer.parseInt(
properties.getProperty(
"load_tsfile_analyze_schema_batch_flush_time_series_number",
String.valueOf(conf.getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber()))));
conf.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
Long.parseLong(
properties.getProperty(
"load_tsfile_analyze_schema_memory_size_in_bytes",
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
Long.parseLong(
properties.getProperty(
"load_clean_up_task_execution_delay_time_seconds",
String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds()))));
conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
// At the same time, set TSFileConfig
List<FSType> fsTypes = new ArrayList<>();
fsTypes.add(FSType.LOCAL);
if (Boolean.parseBoolean(
properties.getProperty("enable_hdfs", String.valueOf(conf.isEnableHDFS())))) {
fsTypes.add(FSType.HDFS);
}
TSFileDescriptor.getInstance().getConfig().setTSFileStorageFs(fsTypes.toArray(new FSType[0]));
TSFileDescriptor.getInstance()
.getConfig()
.setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath()));
TSFileDescriptor.getInstance()
.getConfig()
.setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
TSFileDescriptor.getInstance()
.getConfig()
.setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(","));
TSFileDescriptor.getInstance()
.getConfig()
.setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort()));
TSFileDescriptor.getInstance()
.getConfig()
.setDfsNameServices(properties.getProperty("dfs_nameservices", conf.getDfsNameServices()));
TSFileDescriptor.getInstance()
.getConfig()
.setDfsHaNamenodes(
properties.getProperty("dfs_ha_namenodes", conf.getRawDfsHaNamenodes()).split(","));
TSFileDescriptor.getInstance()
.getConfig()
.setDfsHaAutomaticFailoverEnabled(
Boolean.parseBoolean(
properties.getProperty(
"dfs_ha_automatic_failover_enabled",
String.valueOf(conf.isDfsHaAutomaticFailoverEnabled()))));
TSFileDescriptor.getInstance()
.getConfig()
.setDfsClientFailoverProxyProvider(
properties.getProperty(
"dfs_client_failover_proxy_provider", conf.getDfsClientFailoverProxyProvider()));
TSFileDescriptor.getInstance()
.getConfig()
.setPatternMatchingThreshold(
Integer.parseInt(
properties.getProperty(
"pattern_matching_threshold",
String.valueOf(conf.getPatternMatchingThreshold()))));
TSFileDescriptor.getInstance()
.getConfig()
.setUseKerberos(
Boolean.parseBoolean(
properties.getProperty("hdfs_use_kerberos", String.valueOf(conf.isUseKerberos()))));
TSFileDescriptor.getInstance()
.getConfig()
.setKerberosKeytabFilePath(
properties.getProperty("kerberos_keytab_file_path", conf.getKerberosKeytabFilePath()));
TSFileDescriptor.getInstance()
.getConfig()
.setKerberosPrincipal(
properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
conf.setCoordinatorReadExecutorSize(
Integer.parseInt(
properties.getProperty(
"coordinator_read_executor_size",
Integer.toString(conf.getCoordinatorReadExecutorSize()))));
conf.setCoordinatorWriteExecutorSize(
Integer.parseInt(
properties.getProperty(
"coordinator_write_executor_size",
Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
// Commons
commonDescriptor.loadCommonProps(properties);
commonDescriptor.initCommonConfigDir(conf.getSystemDir());
loadWALProps(properties);
// Timed flush memtable
loadTimedService(properties);
// Set tsfile-format config
loadTsFileProps(properties);
// Make RPCTransportFactory taking effect.
ZeroCopyRpcTransportFactory.reInit();
DeepCopyRpcTransportFactory.reInit();
// UDF
loadUDFProps(properties);
// Thrift ssl
initThriftSSL(properties);
// Trigger
loadTriggerProps(properties);
// CQ
loadCQProps(properties);
// Pipe
loadPipeProps(properties);
// Cluster
loadClusterProps(properties);
// Shuffle
loadShuffleProps(properties);
// Author cache
loadAuthorCache(properties);
conf.setQuotaEnable(
Boolean.parseBoolean(
properties.getProperty("quota_enable", String.valueOf(conf.isQuotaEnable()))));
// The buffer for sort operator to calculate
conf.setSortBufferSize(
Long.parseLong(
properties
.getProperty("sort_buffer_size_in_bytes", Long.toString(conf.getSortBufferSize()))
.trim()));
// tmp filePath for sort operator
conf.setSortTmpDir(properties.getProperty("sort_tmp_dir", conf.getSortTmpDir()));
conf.setRateLimiterType(properties.getProperty("rate_limiter_type", conf.getRateLimiterType()));
conf.setDataNodeSchemaCacheEvictionPolicy(
properties.getProperty(
"datanode_schema_cache_eviction_policy", conf.getDataNodeSchemaCacheEvictionPolicy()));
loadIoTConsensusProps(properties);
}
private void loadIoTConsensusProps(Properties properties) {
conf.setMaxLogEntriesNumPerBatch(
Integer.parseInt(
properties
.getProperty(
"data_region_iot_max_log_entries_num_per_batch",
String.valueOf(conf.getMaxLogEntriesNumPerBatch()))
.trim()));
conf.setMaxSizePerBatch(
Integer.parseInt(
properties
.getProperty(
"data_region_iot_max_size_per_batch", String.valueOf(conf.getMaxSizePerBatch()))
.trim()));
conf.setMaxPendingBatchesNum(
Integer.parseInt(
properties
.getProperty(
"data_region_iot_max_pending_batches_num",
String.valueOf(conf.getMaxPendingBatchesNum()))
.trim()));
conf.setMaxMemoryRatioForQueue(
Double.parseDouble(
properties
.getProperty(
"data_region_iot_max_memory_ratio_for_queue",
String.valueOf(conf.getMaxMemoryRatioForQueue()))
.trim()));
conf.setRegionMigrationSpeedLimitBytesPerSecond(
Long.parseLong(
properties
.getProperty(
"region_migration_speed_limit_bytes_per_second",
String.valueOf(conf.getRegionMigrationSpeedLimitBytesPerSecond()))
.trim()));
}
private void loadAuthorCache(Properties properties) {
conf.setAuthorCacheSize(
Integer.parseInt(
properties.getProperty(
"author_cache_size", String.valueOf(conf.getAuthorCacheSize()))));
conf.setAuthorCacheExpireTime(
Integer.parseInt(
properties.getProperty(
"author_cache_expire_time", String.valueOf(conf.getAuthorCacheExpireTime()))));
}
private void loadWALProps(Properties properties) {
conf.setWalMode(
WALMode.valueOf((properties.getProperty("wal_mode", conf.getWalMode().toString()))));
int maxWalNodesNum =
Integer.parseInt(
properties.getProperty(
"max_wal_nodes_num", Integer.toString(conf.getMaxWalNodesNum())));
if (maxWalNodesNum > 0) {
conf.setMaxWalNodesNum(maxWalNodesNum);
}
int walBufferSize =
Integer.parseInt(
properties.getProperty(
"wal_buffer_size_in_byte", Integer.toString(conf.getWalBufferSize())));
if (walBufferSize > 0) {
conf.setWalBufferSize(walBufferSize);
}
int walBufferQueueCapacity =
Integer.parseInt(
properties.getProperty(
"wal_buffer_queue_capacity", Integer.toString(conf.getWalBufferQueueCapacity())));
if (walBufferQueueCapacity > 0) {
conf.setWalBufferQueueCapacity(walBufferQueueCapacity);
}
loadWALHotModifiedProps(properties);
}
private void loadCompactionHotModifiedProps(Properties properties) throws InterruptedException {
// hot load compaction schedule task manager configurations
int compactionScheduleThreadNum =
Integer.parseInt(
properties.getProperty(
"compaction_schedule_thread_num",
Integer.toString(conf.getCompactionScheduleThreadNum())));
compactionScheduleThreadNum =
compactionScheduleThreadNum <= 0 ? 1 : compactionScheduleThreadNum;
conf.setCompactionScheduleThreadNum(compactionScheduleThreadNum);
CompactionScheduleTaskManager.getInstance().checkAndMayApplyConfigurationChange();
// hot load compaction task manager configurations
loadCompactionIsEnabledHotModifiedProps(properties);
boolean restartCompactionTaskManager = loadCompactionThreadCountHotModifiedProps(properties);
restartCompactionTaskManager |= loadCompactionSubTaskCountHotModifiedProps(properties);
if (restartCompactionTaskManager) {
CompactionTaskManager.getInstance().restart();
}
// hot load compaction rate limit configurations
// update merge_write_throughput_mb_per_sec
conf.setCompactionWriteThroughputMbPerSec(
Integer.parseInt(
properties.getProperty(
"merge_write_throughput_mb_per_sec",
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
// update compaction_read_operation_per_sec
conf.setCompactionReadOperationPerSec(
Integer.parseInt(
properties.getProperty(
"compaction_read_operation_per_sec",
Integer.toString(conf.getCompactionReadOperationPerSec()))));
// update compaction_read_throughput_mb_per_sec
conf.setCompactionReadThroughputMbPerSec(
Integer.parseInt(
properties.getProperty(
"compaction_read_throughput_mb_per_sec",
Integer.toString(conf.getCompactionReadThroughputMbPerSec()))));
CompactionTaskManager.getInstance()
.setCompactionReadOperationRate(conf.getCompactionReadOperationPerSec());
CompactionTaskManager.getInstance()
.setCompactionReadThroughputRate(conf.getCompactionReadThroughputMbPerSec());
CompactionTaskManager.getInstance()
.setWriteMergeRate(conf.getCompactionWriteThroughputMbPerSec());
}
private boolean loadCompactionThreadCountHotModifiedProps(Properties properties) {
int newConfigCompactionThreadCount =
Integer.parseInt(
properties.getProperty(
"compaction_thread_count", Integer.toString(conf.getCompactionThreadCount())));
if (newConfigCompactionThreadCount <= 0) {
LOGGER.error("compaction_thread_count must greater than 0");
return false;
}
if (newConfigCompactionThreadCount == conf.getCompactionThreadCount()) {
return false;
}
conf.setCompactionThreadCount(
Integer.parseInt(
properties.getProperty(
"compaction_thread_count", Integer.toString(conf.getCompactionThreadCount()))));
return true;
}
private boolean loadCompactionSubTaskCountHotModifiedProps(Properties properties) {
int newConfigSubtaskNum =
Integer.parseInt(
properties.getProperty(
"sub_compaction_thread_count", Integer.toString(conf.getSubCompactionTaskNum())));
if (newConfigSubtaskNum <= 0) {
LOGGER.error("sub_compaction_thread_count must greater than 0");
return false;
}
if (newConfigSubtaskNum == conf.getSubCompactionTaskNum()) {
return false;
}
conf.setSubCompactionTaskNum(newConfigSubtaskNum);
return true;
}
private void loadCompactionIsEnabledHotModifiedProps(Properties properties) {
boolean isCompactionEnabled =
conf.isEnableSeqSpaceCompaction()
|| conf.isEnableUnseqSpaceCompaction()
|| conf.isEnableCrossSpaceCompaction();
boolean newConfigEnableCrossSpaceCompaction =
Boolean.parseBoolean(
properties.getProperty(
"enable_cross_space_compaction",
Boolean.toString(conf.isEnableCrossSpaceCompaction())));
boolean newConfigEnableSeqSpaceCompaction =
Boolean.parseBoolean(
properties.getProperty(
"enable_seq_space_compaction",
Boolean.toString(conf.isEnableSeqSpaceCompaction())));
boolean newConfigEnableUnseqSpaceCompaction =
Boolean.parseBoolean(
properties.getProperty(
"enable_unseq_space_compaction",
Boolean.toString(conf.isEnableUnseqSpaceCompaction())));
boolean compactionEnabledInNewConfig =
newConfigEnableCrossSpaceCompaction
|| newConfigEnableSeqSpaceCompaction
|| newConfigEnableUnseqSpaceCompaction;
if (!isCompactionEnabled && compactionEnabledInNewConfig) {
LOGGER.error("Compaction cannot start in current status.");
return;
}
conf.setEnableCrossSpaceCompaction(newConfigEnableCrossSpaceCompaction);
conf.setEnableSeqSpaceCompaction(newConfigEnableSeqSpaceCompaction);
conf.setEnableUnseqSpaceCompaction(newConfigEnableUnseqSpaceCompaction);
conf.setInnerCompactionTaskSelectionDiskRedundancy(
Double.parseDouble(
properties.getProperty(
"inner_compaction_task_selection_disk_redundancy",
Double.toString(conf.getInnerCompactionTaskSelectionDiskRedundancy()))));
conf.setInnerCompactionTaskSelectionModsFileThreshold(
Long.parseLong(
properties.getProperty(
"inner_compaction_task_selection_mods_file_threshold",
Long.toString(conf.getInnerCompactionTaskSelectionModsFileThreshold()))));
}
private void loadWALHotModifiedProps(Properties properties) {
long walAsyncModeFsyncDelayInMs =
Long.parseLong(
properties.getProperty(
"wal_async_mode_fsync_delay_in_ms",
Long.toString(conf.getWalAsyncModeFsyncDelayInMs())));
if (walAsyncModeFsyncDelayInMs > 0) {
conf.setWalAsyncModeFsyncDelayInMs(walAsyncModeFsyncDelayInMs);
}
long walSyncModeFsyncDelayInMs =
Long.parseLong(
properties.getProperty(
"wal_sync_mode_fsync_delay_in_ms",
Long.toString(conf.getWalSyncModeFsyncDelayInMs())));
if (walSyncModeFsyncDelayInMs > 0) {
conf.setWalSyncModeFsyncDelayInMs(walSyncModeFsyncDelayInMs);
}
long walFileSizeThreshold =
Long.parseLong(
properties.getProperty(
"wal_file_size_threshold_in_byte",
Long.toString(conf.getWalFileSizeThresholdInByte())));
if (walFileSizeThreshold > 0) {
conf.setWalFileSizeThresholdInByte(walFileSizeThreshold);
}
double walMinEffectiveInfoRatio =
Double.parseDouble(
properties.getProperty(
"wal_min_effective_info_ratio",
Double.toString(conf.getWalMinEffectiveInfoRatio())));
if (walMinEffectiveInfoRatio > 0) {
conf.setWalMinEffectiveInfoRatio(walMinEffectiveInfoRatio);
}
long walMemTableSnapshotThreshold =
Long.parseLong(
properties.getProperty(
"wal_memtable_snapshot_threshold_in_byte",
Long.toString(conf.getWalMemTableSnapshotThreshold())));
if (walMemTableSnapshotThreshold > 0) {
conf.setWalMemTableSnapshotThreshold(walMemTableSnapshotThreshold);
}
int maxWalMemTableSnapshotNum =
Integer.parseInt(
properties.getProperty(
"max_wal_memtable_snapshot_num",
Integer.toString(conf.getMaxWalMemTableSnapshotNum())));
if (maxWalMemTableSnapshotNum > 0) {
conf.setMaxWalMemTableSnapshotNum(maxWalMemTableSnapshotNum);
}
long deleteWalFilesPeriod =
Long.parseLong(
properties.getProperty(
"delete_wal_files_period_in_ms",
Long.toString(conf.getDeleteWalFilesPeriodInMs())));
if (deleteWalFilesPeriod > 0) {
conf.setDeleteWalFilesPeriodInMs(deleteWalFilesPeriod);
}
long throttleDownThresholdInByte =
Long.parseLong(
properties.getProperty(
"iot_consensus_throttle_threshold_in_byte",
Long.toString(getThrottleThresholdWithDirs())));
if (throttleDownThresholdInByte > 0) {
conf.setThrottleThreshold(throttleDownThresholdInByte);
}
long cacheWindowInMs =
Long.parseLong(
properties.getProperty(
"iot_consensus_cache_window_time_in_ms",
Long.toString(conf.getCacheWindowTimeInMs())));
if (cacheWindowInMs > 0) {
conf.setCacheWindowTimeInMs(cacheWindowInMs);
}
}
public long getThrottleThresholdWithDirs() {
ArrayList<String> dataDiskDirs = new ArrayList<>(Arrays.asList(conf.getDataDirs()));
ArrayList<String> walDiskDirs =
new ArrayList<>(Arrays.asList(commonDescriptor.getConfig().getWalDirs()));
Set<FileStore> dataFileStores = SystemMetrics.getFileStores(dataDiskDirs);
Set<FileStore> walFileStores = SystemMetrics.getFileStores(walDiskDirs);
double dirUseProportion = 0;
dataFileStores.retainAll(walFileStores);
// if there is no common disk between data and wal, use more usableSpace.
if (dataFileStores.isEmpty()) {
dirUseProportion = MAX_DIR_USE_PROPORTION;
} else {
dirUseProportion = MIN_DIR_USE_PROPORTION;
}
long newThrottleThreshold = Long.MAX_VALUE;
for (FileStore fileStore : walFileStores) {
try {
newThrottleThreshold = Math.min(newThrottleThreshold, fileStore.getUsableSpace());
} catch (IOException e) {
LOGGER.error("Failed to get file size of {}, because", fileStore, e);
}
}
newThrottleThreshold = (long) (newThrottleThreshold * dirUseProportion * walFileStores.size());
// the new throttle threshold should between MIN_THROTTLE_THRESHOLD and MAX_THROTTLE_THRESHOLD
return Math.max(Math.min(newThrottleThreshold, MAX_THROTTLE_THRESHOLD), MIN_THROTTLE_THRESHOLD);
}
private void loadAutoCreateSchemaProps(Properties properties) {
conf.setAutoCreateSchemaEnabled(
Boolean.parseBoolean(
properties.getProperty(
"enable_auto_create_schema",
Boolean.toString(conf.isAutoCreateSchemaEnabled()).trim())));
conf.setBooleanStringInferType(
TSDataType.valueOf(
properties.getProperty(
"boolean_string_infer_type", conf.getBooleanStringInferType().toString())));
conf.setIntegerStringInferType(
TSDataType.valueOf(
properties.getProperty(
"integer_string_infer_type", conf.getIntegerStringInferType().toString())));
conf.setFloatingStringInferType(
TSDataType.valueOf(
properties.getProperty(
"floating_string_infer_type", conf.getFloatingStringInferType().toString())));
conf.setNanStringInferType(
TSDataType.valueOf(
properties.getProperty(
"nan_string_infer_type", conf.getNanStringInferType().toString())));
conf.setDefaultStorageGroupLevel(
Integer.parseInt(
properties.getProperty(
"default_storage_group_level",
Integer.toString(conf.getDefaultStorageGroupLevel()))));
conf.setDefaultBooleanEncoding(
properties.getProperty(
"default_boolean_encoding", conf.getDefaultBooleanEncoding().toString()));
conf.setDefaultInt32Encoding(
properties.getProperty(
"default_int32_encoding", conf.getDefaultInt32Encoding().toString()));
conf.setDefaultInt64Encoding(
properties.getProperty(
"default_int64_encoding", conf.getDefaultInt64Encoding().toString()));
conf.setDefaultFloatEncoding(
properties.getProperty(
"default_float_encoding", conf.getDefaultFloatEncoding().toString()));
conf.setDefaultDoubleEncoding(
properties.getProperty(
"default_double_encoding", conf.getDefaultDoubleEncoding().toString()));
conf.setDefaultTextEncoding(
properties.getProperty("default_text_encoding", conf.getDefaultTextEncoding().toString()));
}
private void loadTsFileProps(Properties properties) {
TSFileDescriptor.getInstance()
.getConfig()
.setGroupSizeInByte(
Integer.parseInt(
properties.getProperty(
"group_size_in_byte",
Integer.toString(
TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte()))));
TSFileDescriptor.getInstance()
.getConfig()
.setPageSizeInByte(
Integer.parseInt(
properties.getProperty(
"page_size_in_byte",
Integer.toString(
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()))));
if (TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
> TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte()) {
LOGGER.warn("page_size is greater than group size, will set it as the same with group size");
TSFileDescriptor.getInstance()
.getConfig()
.setPageSizeInByte(TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte());
}
TSFileDescriptor.getInstance()
.getConfig()
.setMaxNumberOfPointsInPage(
Integer.parseInt(
properties.getProperty(
"max_number_of_points_in_page",
Integer.toString(
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage()))));
TSFileDescriptor.getInstance()
.getConfig()
.setMaxStringLength(
Integer.parseInt(
properties.getProperty(
"max_string_length",
Integer.toString(
TSFileDescriptor.getInstance().getConfig().getMaxStringLength()))));
TSFileDescriptor.getInstance()
.getConfig()
.setBloomFilterErrorRate(
Double.parseDouble(
properties.getProperty(
"bloom_filter_error_rate",
Double.toString(
TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate()))));
TSFileDescriptor.getInstance()
.getConfig()
.setFloatPrecision(
Integer.parseInt(
properties.getProperty(
"float_precision",
Integer.toString(
TSFileDescriptor.getInstance().getConfig().getFloatPrecision()))));
TSFileDescriptor.getInstance()
.getConfig()
.setValueEncoder(
properties.getProperty(
"value_encoder", TSFileDescriptor.getInstance().getConfig().getValueEncoder()));
TSFileDescriptor.getInstance()
.getConfig()
.setCompressor(
properties.getProperty(
"compressor",
TSFileDescriptor.getInstance().getConfig().getCompressor().toString()));
TSFileDescriptor.getInstance()
.getConfig()
.setMaxDegreeOfIndexNode(
Integer.parseInt(
properties.getProperty(
"max_degree_of_index_node",
Integer.toString(
TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode()))));
TSFileDescriptor.getInstance()
.getConfig()
.setMaxTsBlockSizeInBytes(
Integer.parseInt(
properties.getProperty(
"max_tsblock_size_in_bytes",
Integer.toString(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes()))));
// min(default_size, maxBytesForQuery)
TSFileDescriptor.getInstance()
.getConfig()
.setMaxTsBlockSizeInBytes(
(int)
Math.min(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
conf.getMaxBytesPerFragmentInstance()));
TSFileDescriptor.getInstance()
.getConfig()
.setMaxTsBlockLineNumber(
Integer.parseInt(
properties.getProperty(
"max_tsblock_line_number",
Integer.toString(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()))));
}
// Mqtt related
private void loadMqttProps(Properties properties) {
conf.setMqttDir(properties.getProperty("mqtt_root_dir", conf.getMqttDir()));
if (properties.getProperty(IoTDBConstant.MQTT_HOST_NAME) != null) {
conf.setMqttHost(properties.getProperty(IoTDBConstant.MQTT_HOST_NAME));
} else {
LOGGER.info("MQTT host is not configured, will use dn_rpc_address.");
conf.setMqttHost(
properties.getProperty(IoTDBConstant.DN_RPC_ADDRESS, conf.getRpcAddress().trim()));
}
if (properties.getProperty(IoTDBConstant.MQTT_PORT_NAME) != null) {
conf.setMqttPort(Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_PORT_NAME)));
}
if (properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME) != null) {
conf.setMqttHandlerPoolSize(
Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME)));
}
if (properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME) != null) {
conf.setMqttPayloadFormatter(
properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME));
}
if (properties.getProperty(IoTDBConstant.ENABLE_MQTT) != null) {
conf.setEnableMQTTService(
Boolean.parseBoolean(properties.getProperty(IoTDBConstant.ENABLE_MQTT)));
}
if (properties.getProperty(IoTDBConstant.MQTT_MAX_MESSAGE_SIZE) != null) {
conf.setMqttMaxMessageSize(
Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_MAX_MESSAGE_SIZE)));
}
}
// timed flush memtable
private void loadTimedService(Properties properties) {
conf.setEnableTimedFlushSeqMemtable(
Boolean.parseBoolean(
properties.getProperty(
"enable_timed_flush_seq_memtable",
Boolean.toString(conf.isEnableTimedFlushSeqMemtable()))));
long seqMemTableFlushInterval =
Long.parseLong(
properties
.getProperty(
"seq_memtable_flush_interval_in_ms",
Long.toString(conf.getSeqMemtableFlushInterval()))
.trim());
if (seqMemTableFlushInterval > 0) {
conf.setSeqMemtableFlushInterval(seqMemTableFlushInterval);
}
long seqMemTableFlushCheckInterval =
Long.parseLong(
properties
.getProperty(
"seq_memtable_flush_check_interval_in_ms",
Long.toString(conf.getSeqMemtableFlushCheckInterval()))
.trim());
if (seqMemTableFlushCheckInterval > 0) {
conf.setSeqMemtableFlushCheckInterval(seqMemTableFlushCheckInterval);
}
conf.setEnableTimedFlushUnseqMemtable(
Boolean.parseBoolean(
properties.getProperty(
"enable_timed_flush_unseq_memtable",
Boolean.toString(conf.isEnableTimedFlushUnseqMemtable()))));
long unseqMemTableFlushInterval =
Long.parseLong(
properties
.getProperty(
"unseq_memtable_flush_interval_in_ms",
Long.toString(conf.getUnseqMemtableFlushInterval()))
.trim());
if (unseqMemTableFlushInterval > 0) {
conf.setUnseqMemtableFlushInterval(unseqMemTableFlushInterval);
}
long unseqMemTableFlushCheckInterval =
Long.parseLong(
properties
.getProperty(
"unseq_memtable_flush_check_interval_in_ms",
Long.toString(conf.getUnseqMemtableFlushCheckInterval()))
.trim());
if (unseqMemTableFlushCheckInterval > 0) {
conf.setUnseqMemtableFlushCheckInterval(unseqMemTableFlushCheckInterval);
}
}
private String[][] parseDataDirs(String dataDirs) {
String[] tiers = dataDirs.split(IoTDBConstant.TIER_SEPARATOR);
String[][] tierDataDirs = new String[tiers.length][];
for (int i = 0; i < tiers.length; ++i) {
tierDataDirs[i] = tiers[i].split(",");
}
return tierDataDirs;
}
public void loadHotModifiedProps(Properties properties) throws QueryProcessException {
try {
// update data dirs
String dataDirs = properties.getProperty("dn_data_dirs", null);
if (dataDirs != null) {
conf.reloadDataDirs(parseDataDirs(dataDirs));
}
// update dir strategy
String multiDirStrategyClassName = properties.getProperty("dn_multi_dir_strategy", null);
if (multiDirStrategyClassName != null
&& !multiDirStrategyClassName.equals(conf.getMultiDirStrategyClassName())) {
conf.setMultiDirStrategyClassName(multiDirStrategyClassName);
conf.confirmMultiDirStrategy();
}
TierManager.getInstance().resetFolders();
// update timed flush & close conf
loadTimedService(properties);
StorageEngine.getInstance().rebootTimedService();
// update params of creating schema automatically
loadAutoCreateSchemaProps(properties);
// update tsfile-format config
loadTsFileProps(properties);
// update slow_query_threshold
conf.setSlowQueryThreshold(
Long.parseLong(
properties.getProperty(
"slow_query_threshold", Long.toString(conf.getSlowQueryThreshold()))));
// update select into operation max buffer size
conf.setIntoOperationBufferSizeInByte(
Long.parseLong(
properties.getProperty(
"into_operation_buffer_size_in_byte",
String.valueOf(conf.getIntoOperationBufferSizeInByte()))));
// update insert-tablet-plan's row limit for select-into
conf.setSelectIntoInsertTabletPlanRowLimit(
Integer.parseInt(
properties.getProperty(
"select_into_insert_tablet_plan_row_limit",
String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
// update enable query memory estimation for memory control
conf.setEnableQueryMemoryEstimation(
Boolean.parseBoolean(
properties.getProperty(
"enable_query_memory_estimation",
Boolean.toString(conf.isEnableQueryMemoryEstimation()))));
conf.setEnableTsFileValidation(
Boolean.parseBoolean(
properties.getProperty(
"enable_tsfile_validation", String.valueOf(conf.isEnableTsFileValidation()))));
// update wal config
long prevDeleteWalFilesPeriodInMs = conf.getDeleteWalFilesPeriodInMs();
loadWALHotModifiedProps(properties);
if (prevDeleteWalFilesPeriodInMs != conf.getDeleteWalFilesPeriodInMs()) {
WALManager.getInstance().rebootWALDeleteThread();
}
// update compaction config
loadCompactionHotModifiedProps(properties);
// update load config
conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
Long.parseLong(
properties.getProperty(
"load_clean_up_task_execution_delay_time_seconds",
String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds()))));
// update merge_threshold_of_explain_analyze
conf.setMergeThresholdOfExplainAnalyze(
Integer.parseInt(
properties.getProperty(
"merge_threshold_of_explain_analyze",
String.valueOf(conf.getMergeThresholdOfExplainAnalyze()))));
} catch (Exception e) {
throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
}
}
public void loadHotModifiedProps() throws QueryProcessException {
URL url = getPropsUrl(CommonConfig.CONFIG_NAME);
if (url == null) {
LOGGER.warn("Couldn't load the configuration from any of the known sources.");
return;
}
Properties commonProperties = new Properties();
try (InputStream inputStream = url.openStream()) {
LOGGER.info("Start to reload config file {}", url);
commonProperties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
} catch (Exception e) {
LOGGER.warn("Fail to reload config file {}", url, e);
throw new QueryProcessException(
String.format("Fail to reload config file %s because %s", url, e.getMessage()));
}
url = getPropsUrl(IoTDBConfig.CONFIG_NAME);
if (url == null) {
LOGGER.warn("Couldn't load the configuration from any of the known sources.");
return;
}
try (InputStream inputStream = url.openStream()) {
LOGGER.info("Start to reload config file {}", url);
Properties properties = new Properties();
properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
commonProperties.putAll(properties);
loadHotModifiedProps(commonProperties);
} catch (Exception e) {
LOGGER.warn("Fail to reload config file {}", url, e);
throw new QueryProcessException(
String.format("Fail to reload config file %s because %s", url, e.getMessage()));
}
ReloadLevel reloadLevel = MetricConfigDescriptor.getInstance().loadHotProps(commonProperties);
LOGGER.info("Reload metric service in level {}", reloadLevel);
if (reloadLevel == ReloadLevel.RESTART_INTERNAL_REPORTER) {
IoTDBInternalReporter internalReporter;
if (MetricConfigDescriptor.getInstance().getMetricConfig().getInternalReportType()
== InternalReporterType.IOTDB) {
internalReporter = new IoTDBInternalLocalReporter();
} else {
internalReporter = new IoTDBInternalMemoryReporter();
}
MetricService.getInstance().reloadInternalReporter(internalReporter);
} else {
MetricService.getInstance().reloadService(reloadLevel);
}
}
private void initMemoryAllocate(Properties properties) {
String memoryAllocateProportion = properties.getProperty("datanode_memory_proportion", null);
if (memoryAllocateProportion == null) {
memoryAllocateProportion =
properties.getProperty("storage_query_schema_consensus_free_memory_proportion");
if (memoryAllocateProportion != null) {
LOGGER.warn(
"The parameter storage_query_schema_consensus_free_memory_proportion is deprecated since v1.2.3, "
+ "please use datanode_memory_proportion instead.");
}
}
if (memoryAllocateProportion != null) {
String[] proportions = memoryAllocateProportion.split(":");
int proportionSum = 0;
for (String proportion : proportions) {
proportionSum += Integer.parseInt(proportion.trim());
}
long maxMemoryAvailable = Runtime.getRuntime().maxMemory();
if (proportionSum != 0) {
conf.setAllocateMemoryForStorageEngine(
maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum);
conf.setAllocateMemoryForRead(
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
conf.setAllocateMemoryForSchema(
maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum);
conf.setAllocateMemoryForConsensus(
maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) / proportionSum);
// if pipe proportion is set, use it, otherwise use the default value
if (proportions.length >= 6) {
conf.setAllocateMemoryForPipe(
maxMemoryAvailable * Integer.parseInt(proportions[4].trim()) / proportionSum);
} else {
conf.setAllocateMemoryForPipe(
(maxMemoryAvailable
- (conf.getAllocateMemoryForStorageEngine()
+ conf.getAllocateMemoryForRead()
+ conf.getAllocateMemoryForSchema()
+ conf.getAllocateMemoryForConsensus()))
/ 2);
}
}
}
LOGGER.info("initial allocateMemoryForRead = {}", conf.getAllocateMemoryForRead());
LOGGER.info("initial allocateMemoryForWrite = {}", conf.getAllocateMemoryForStorageEngine());
LOGGER.info("initial allocateMemoryForSchema = {}", conf.getAllocateMemoryForSchema());
LOGGER.info("initial allocateMemoryForConsensus = {}", conf.getAllocateMemoryForConsensus());
LOGGER.info("initial allocateMemoryForPipe = {}", conf.getAllocateMemoryForPipe());
initSchemaMemoryAllocate(properties);
initStorageEngineAllocate(properties);
conf.setEnableQueryMemoryEstimation(
Boolean.parseBoolean(
properties.getProperty(
"enable_query_memory_estimation",
Boolean.toString(conf.isEnableQueryMemoryEstimation()))));
String queryMemoryAllocateProportion =
properties.getProperty("chunk_timeseriesmeta_free_memory_proportion");
if (queryMemoryAllocateProportion != null) {
String[] proportions = queryMemoryAllocateProportion.split(":");
int proportionSum = 0;
for (String proportion : proportions) {
proportionSum += Integer.parseInt(proportion.trim());
}
long maxMemoryAvailable = conf.getAllocateMemoryForRead();
if (proportionSum != 0) {
try {
conf.setAllocateMemoryForBloomFilterCache(
maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum);
conf.setAllocateMemoryForChunkCache(
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
conf.setAllocateMemoryForTimeSeriesMetaDataCache(
maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum);
conf.setAllocateMemoryForCoordinator(
maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) / proportionSum);
conf.setAllocateMemoryForOperators(
maxMemoryAvailable * Integer.parseInt(proportions[4].trim()) / proportionSum);
conf.setAllocateMemoryForDataExchange(
maxMemoryAvailable * Integer.parseInt(proportions[5].trim()) / proportionSum);
conf.setAllocateMemoryForTimeIndex(
maxMemoryAvailable * Integer.parseInt(proportions[6].trim()) / proportionSum);
} catch (Exception e) {
throw new RuntimeException(
"Each subsection of configuration item chunkmeta_chunk_timeseriesmeta_free_memory_proportion"
+ " should be an integer, which is "
+ queryMemoryAllocateProportion);
}
}
}
// metadata cache is disabled, we need to move all their allocated memory to other parts
if (!conf.isMetaDataCacheEnable()) {
long sum =
conf.getAllocateMemoryForBloomFilterCache()
+ conf.getAllocateMemoryForChunkCache()
+ conf.getAllocateMemoryForTimeSeriesMetaDataCache();
conf.setAllocateMemoryForBloomFilterCache(0);
conf.setAllocateMemoryForChunkCache(0);
conf.setAllocateMemoryForTimeSeriesMetaDataCache(0);
long partForDataExchange = sum / 2;
long partForOperators = sum - partForDataExchange;
conf.setAllocateMemoryForDataExchange(
conf.getAllocateMemoryForDataExchange() + partForDataExchange);
conf.setAllocateMemoryForOperators(conf.getAllocateMemoryForOperators() + partForOperators);
}
}
private void initStorageEngineAllocate(Properties properties) {
long storageMemoryTotal = conf.getAllocateMemoryForStorageEngine();
String valueOfStorageEngineMemoryProportion =
properties.getProperty("storage_engine_memory_proportion");
if (valueOfStorageEngineMemoryProportion != null) {
String[] storageProportionArray = valueOfStorageEngineMemoryProportion.split(":");
int storageEngineMemoryProportion = 0;
for (String proportion : storageProportionArray) {
int proportionValue = Integer.parseInt(proportion.trim());
if (proportionValue <= 0) {
LOGGER.warn(
"The value of storage_engine_memory_proportion is illegal, use default value 8:2 .");
return;
}
storageEngineMemoryProportion += proportionValue;
}
conf.setCompactionProportion(
(double) Integer.parseInt(storageProportionArray[1].trim())
/ (double) storageEngineMemoryProportion);
String valueOfWriteMemoryProportion = properties.getProperty("write_memory_proportion");
if (valueOfWriteMemoryProportion != null) {
String[] writeProportionArray = valueOfWriteMemoryProportion.split(":");
int writeMemoryProportion = 0;
for (String proportion : writeProportionArray) {
int proportionValue = Integer.parseInt(proportion.trim());
writeMemoryProportion += proportionValue;
if (proportionValue <= 0) {
LOGGER.warn(
"The value of write_memory_proportion is illegal, use default value 19:1 .");
return;
}
}
double writeAllProportionOfStorageEngineMemory =
(double) Integer.parseInt(storageProportionArray[0].trim())
/ storageEngineMemoryProportion;
double memTableProportion =
(double) Integer.parseInt(writeProportionArray[0].trim()) / writeMemoryProportion;
double timePartitionInfoProportion =
(double) Integer.parseInt(writeProportionArray[1].trim()) / writeMemoryProportion;
// writeProportionForMemtable = 8/10 * 19/20 = 0.76 default
conf.setWriteProportionForMemtable(
writeAllProportionOfStorageEngineMemory * memTableProportion);
// allocateMemoryForTimePartitionInfo = storageMemoryTotal * 8/10 * 1/20 default
conf.setAllocateMemoryForTimePartitionInfo(
(long)
((writeAllProportionOfStorageEngineMemory * timePartitionInfoProportion)
* storageMemoryTotal));
}
}
}
private void initSchemaMemoryAllocate(Properties properties) {
long schemaMemoryTotal = conf.getAllocateMemoryForSchema();
String schemaMemoryPortionInput = properties.getProperty("schema_memory_proportion");
if (schemaMemoryPortionInput != null) {
String[] proportions = schemaMemoryPortionInput.split(":");
int loadedProportionSum = 0;
for (String proportion : proportions) {
loadedProportionSum += Integer.parseInt(proportion.trim());
}
if (loadedProportionSum != 0) {
conf.setSchemaMemoryProportion(
new int[] {
Integer.parseInt(proportions[0].trim()),
Integer.parseInt(proportions[1].trim()),
Integer.parseInt(proportions[2].trim())
});
}
} else {
schemaMemoryPortionInput = properties.getProperty("schema_memory_allocate_proportion");
if (schemaMemoryPortionInput != null) {
String[] proportions = schemaMemoryPortionInput.split(":");
int loadedProportionSum = 0;
for (String proportion : proportions) {
loadedProportionSum += Integer.parseInt(proportion.trim());
}
if (loadedProportionSum != 0) {
conf.setSchemaMemoryProportion(
new int[] {
Integer.parseInt(proportions[0].trim()),
Integer.parseInt(proportions[1].trim()) + Integer.parseInt(proportions[3].trim()),
Integer.parseInt(proportions[2].trim())
});
}
}
}
int proportionSum = 0;
for (int proportion : conf.getSchemaMemoryProportion()) {
proportionSum += proportion;
}
conf.setAllocateMemoryForSchemaRegion(
schemaMemoryTotal * conf.getSchemaMemoryProportion()[0] / proportionSum);
LOGGER.info("allocateMemoryForSchemaRegion = {}", conf.getAllocateMemoryForSchemaRegion());
conf.setAllocateMemoryForSchemaCache(
schemaMemoryTotal * conf.getSchemaMemoryProportion()[1] / proportionSum);
LOGGER.info("allocateMemoryForSchemaCache = {}", conf.getAllocateMemoryForSchemaCache());
conf.setAllocateMemoryForPartitionCache(
schemaMemoryTotal * conf.getSchemaMemoryProportion()[2] / proportionSum);
LOGGER.info("allocateMemoryForPartitionCache = {}", conf.getAllocateMemoryForPartitionCache());
}
@SuppressWarnings("squid:S3518") // "proportionSum" can't be zero
private void loadUDFProps(Properties properties) {
String initialByteArrayLengthForMemoryControl =
properties.getProperty("udf_initial_byte_array_length_for_memory_control");
if (initialByteArrayLengthForMemoryControl != null) {
conf.setUdfInitialByteArrayLengthForMemoryControl(
Integer.parseInt(initialByteArrayLengthForMemoryControl));
}
conf.setUdfDir(properties.getProperty("udf_lib_dir", conf.getUdfDir()));
String memoryBudgetInMb = properties.getProperty("udf_memory_budget_in_mb");
if (memoryBudgetInMb != null) {
conf.setUdfMemoryBudgetInMB(
(float)
Math.min(Float.parseFloat(memoryBudgetInMb), 0.2 * conf.getAllocateMemoryForRead()));
}
String readerTransformerCollectorMemoryProportion =
properties.getProperty("udf_reader_transformer_collector_memory_proportion");
if (readerTransformerCollectorMemoryProportion != null) {
String[] proportions = readerTransformerCollectorMemoryProportion.split(":");
int proportionSum = 0;
for (String proportion : proportions) {
proportionSum += Integer.parseInt(proportion.trim());
}
float maxMemoryAvailable = conf.getUdfMemoryBudgetInMB();
try {
conf.setUdfReaderMemoryBudgetInMB(
maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum);
conf.setUdfTransformerMemoryBudgetInMB(
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
conf.setUdfCollectorMemoryBudgetInMB(
maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum);
} catch (Exception e) {
throw new RuntimeException(
"Each subsection of configuration item udf_reader_transformer_collector_memory_proportion"
+ " should be an integer, which is "
+ readerTransformerCollectorMemoryProportion);
}
}
}
private void initThriftSSL(Properties properties) {
conf.setEnableSSL(
Boolean.parseBoolean(
properties.getProperty("enable_thrift_ssl", Boolean.toString(conf.isEnableSSL()))));
conf.setKeyStorePath(properties.getProperty("key_store_path", conf.getKeyStorePath()).trim());
conf.setKeyStorePwd(properties.getProperty("key_store_pwd", conf.getKeyStorePath()).trim());
}
private void loadTriggerProps(Properties properties) {
conf.setTriggerDir(properties.getProperty("trigger_lib_dir", conf.getTriggerDir()));
conf.setRetryNumToFindStatefulTrigger(
Integer.parseInt(
properties.getProperty(
"stateful_trigger_retry_num_when_not_found",
Integer.toString(conf.getRetryNumToFindStatefulTrigger()))));
int tlogBufferSize =
Integer.parseInt(
properties.getProperty("tlog_buffer_size", Integer.toString(conf.getTlogBufferSize())));
if (tlogBufferSize > 0) {
conf.setTlogBufferSize(tlogBufferSize);
}
conf.setTriggerForwardMaxQueueNumber(
Integer.parseInt(
properties.getProperty(
"trigger_forward_max_queue_number",
Integer.toString(conf.getTriggerForwardMaxQueueNumber()))));
conf.setTriggerForwardMaxSizePerQueue(
Integer.parseInt(
properties.getProperty(
"trigger_forward_max_size_per_queue",
Integer.toString(conf.getTriggerForwardMaxSizePerQueue()))));
conf.setTriggerForwardBatchSize(
Integer.parseInt(
properties.getProperty(
"trigger_forward_batch_size",
Integer.toString(conf.getTriggerForwardBatchSize()))));
conf.setTriggerForwardHTTPPoolSize(
Integer.parseInt(
properties.getProperty(
"trigger_forward_http_pool_size",
Integer.toString(conf.getTriggerForwardHTTPPoolSize()))));
conf.setTriggerForwardHTTPPOOLMaxPerRoute(
Integer.parseInt(
properties.getProperty(
"trigger_forward_http_pool_max_per_route",
Integer.toString(conf.getTriggerForwardHTTPPOOLMaxPerRoute()))));
conf.setTriggerForwardMQTTPoolSize(
Integer.parseInt(
properties.getProperty(
"trigger_forward_mqtt_pool_size",
Integer.toString(conf.getTriggerForwardMQTTPoolSize()))));
}
private void loadPipeProps(Properties properties) {
conf.setPipeLibDir(properties.getProperty("pipe_lib_dir", conf.getPipeLibDir()));
conf.setPipeReceiverFileDirs(
Arrays.stream(
properties
.getProperty(
"pipe_receiver_file_dirs", String.join(",", conf.getPipeReceiverFileDirs()))
.trim()
.split(","))
.filter(dir -> !dir.isEmpty())
.toArray(String[]::new));
}
private void loadCQProps(Properties properties) {
conf.setContinuousQueryThreadNum(
Integer.parseInt(
properties.getProperty(
"continuous_query_thread_num",
Integer.toString(conf.getContinuousQueryThreadNum()))));
if (conf.getContinuousQueryThreadNum() <= 0) {
conf.setContinuousQueryThreadNum(Runtime.getRuntime().availableProcessors() / 2);
}
conf.setContinuousQueryMinimumEveryInterval(
DateTimeUtils.convertDurationStrToLong(
properties.getProperty("continuous_query_minimum_every_interval", "1s"),
CommonDescriptor.getInstance().getConfig().getTimestampPrecision(),
false));
}
public void loadClusterProps(Properties properties) throws IOException {
String configNodeUrls = properties.getProperty(IoTDBConstant.DN_SEED_CONFIG_NODE);
if (configNodeUrls == null) {
configNodeUrls = properties.getProperty(IoTDBConstant.DN_TARGET_CONFIG_NODE_LIST);
LOGGER.warn(
"The parameter dn_target_config_node_list has been abandoned, "
+ "only the first ConfigNode address will be used to join in the cluster. "
+ "Please use dn_seed_config_node instead.");
}
if (configNodeUrls != null) {
try {
configNodeUrls = configNodeUrls.trim();
conf.setSeedConfigNode(NodeUrlUtils.parseTEndPointUrls(configNodeUrls).get(0));
} catch (BadNodeUrlException e) {
LOGGER.error("ConfigNodes are set in wrong format, please set them like 127.0.0.1:10710");
}
} else {
throw new IOException(
"The parameter dn_seed_config_node is not set, this DataNode will not join in any cluster.");
}
conf.setInternalAddress(
properties
.getProperty(IoTDBConstant.DN_INTERNAL_ADDRESS, conf.getInternalAddress())
.trim());
conf.setInternalPort(
Integer.parseInt(
properties
.getProperty(
IoTDBConstant.DN_INTERNAL_PORT, Integer.toString(conf.getInternalPort()))
.trim()));
conf.setDataRegionConsensusPort(
Integer.parseInt(
properties
.getProperty(
"dn_data_region_consensus_port",
Integer.toString(conf.getDataRegionConsensusPort()))
.trim()));
conf.setSchemaRegionConsensusPort(
Integer.parseInt(
properties
.getProperty(
"dn_schema_region_consensus_port",
Integer.toString(conf.getSchemaRegionConsensusPort()))
.trim()));
conf.setJoinClusterRetryIntervalMs(
Long.parseLong(
properties
.getProperty(
"dn_join_cluster_retry_interval_ms",
Long.toString(conf.getJoinClusterRetryIntervalMs()))
.trim()));
}
public void loadShuffleProps(Properties properties) {
conf.setMppDataExchangePort(
Integer.parseInt(
properties.getProperty(
"dn_mpp_data_exchange_port", Integer.toString(conf.getMppDataExchangePort()))));
conf.setMppDataExchangeCorePoolSize(
Integer.parseInt(
properties.getProperty(
"mpp_data_exchange_core_pool_size",
Integer.toString(conf.getMppDataExchangeCorePoolSize()))));
conf.setMppDataExchangeMaxPoolSize(
Integer.parseInt(
properties.getProperty(
"mpp_data_exchange_max_pool_size",
Integer.toString(conf.getMppDataExchangeMaxPoolSize()))));
conf.setMppDataExchangeKeepAliveTimeInMs(
Integer.parseInt(
properties.getProperty(
"mpp_data_exchange_keep_alive_time_in_ms",
Integer.toString(conf.getMppDataExchangeKeepAliveTimeInMs()))));
conf.setPartitionCacheSize(
Integer.parseInt(
properties.getProperty(
"partition_cache_size", Integer.toString(conf.getPartitionCacheSize()))));
conf.setDriverTaskExecutionTimeSliceInMs(
Integer.parseInt(
properties.getProperty(
"driver_task_execution_time_slice_in_ms",
Integer.toString(conf.getDriverTaskExecutionTimeSliceInMs()))));
}
/** Get default encode algorithm by data type */
public TSEncoding getDefaultEncodingByType(TSDataType dataType) {
switch (dataType) {
case BOOLEAN:
return conf.getDefaultBooleanEncoding();
case INT32:
return conf.getDefaultInt32Encoding();
case INT64:
return conf.getDefaultInt64Encoding();
case FLOAT:
return conf.getDefaultFloatEncoding();
case DOUBLE:
return conf.getDefaultDoubleEncoding();
default:
return conf.getDefaultTextEncoding();
}
}
// These configurations are received from config node when registering
public void loadGlobalConfig(TGlobalConfig globalConfig) {
conf.setSeriesPartitionExecutorClass(globalConfig.getSeriesPartitionExecutorClass());
conf.setSeriesPartitionSlotNum(globalConfig.getSeriesPartitionSlotNum());
conf.setReadConsistencyLevel(globalConfig.getReadConsistencyLevel());
}
public void loadRatisConfig(TRatisConfig ratisConfig) {
conf.setDataRatisConsensusLogAppenderBufferSizeMax(ratisConfig.getDataAppenderBufferSize());
conf.setSchemaRatisConsensusLogAppenderBufferSizeMax(ratisConfig.getSchemaAppenderBufferSize());
conf.setDataRatisConsensusSnapshotTriggerThreshold(
ratisConfig.getDataSnapshotTriggerThreshold());
conf.setSchemaRatisConsensusSnapshotTriggerThreshold(
ratisConfig.getSchemaSnapshotTriggerThreshold());
conf.setDataRatisConsensusLogUnsafeFlushEnable(ratisConfig.isDataLogUnsafeFlushEnable());
conf.setSchemaRatisConsensusLogUnsafeFlushEnable(ratisConfig.isSchemaLogUnsafeFlushEnable());
conf.setDataRatisConsensusLogForceSyncNum(ratisConfig.getDataRegionLogForceSyncNum());
conf.setSchemaRatisConsensusLogForceSyncNum(ratisConfig.getSchemaRegionLogForceSyncNum());
conf.setDataRatisConsensusLogSegmentSizeMax(ratisConfig.getDataLogSegmentSizeMax());
conf.setSchemaRatisConsensusLogSegmentSizeMax(ratisConfig.getSchemaLogSegmentSizeMax());
conf.setDataRatisConsensusGrpcFlowControlWindow(ratisConfig.getDataGrpcFlowControlWindow());
conf.setSchemaRatisConsensusGrpcFlowControlWindow(ratisConfig.getSchemaGrpcFlowControlWindow());
conf.setDataRatisConsensusGrpcLeaderOutstandingAppendsMax(
ratisConfig.getDataRegionGrpcLeaderOutstandingAppendsMax());
conf.setSchemaRatisConsensusGrpcLeaderOutstandingAppendsMax(
ratisConfig.getSchemaRegionGrpcLeaderOutstandingAppendsMax());
conf.setDataRatisConsensusLeaderElectionTimeoutMinMs(
ratisConfig.getDataLeaderElectionTimeoutMin());
conf.setSchemaRatisConsensusLeaderElectionTimeoutMinMs(
ratisConfig.getSchemaLeaderElectionTimeoutMin());
conf.setDataRatisConsensusLeaderElectionTimeoutMaxMs(
ratisConfig.getDataLeaderElectionTimeoutMax());
conf.setSchemaRatisConsensusLeaderElectionTimeoutMaxMs(
ratisConfig.getSchemaLeaderElectionTimeoutMax());
conf.setDataRatisConsensusRequestTimeoutMs(ratisConfig.getDataRequestTimeout());
conf.setSchemaRatisConsensusRequestTimeoutMs(ratisConfig.getSchemaRequestTimeout());
conf.setDataRatisConsensusMaxRetryAttempts(ratisConfig.getDataMaxRetryAttempts());
conf.setDataRatisConsensusInitialSleepTimeMs(ratisConfig.getDataInitialSleepTime());
conf.setDataRatisConsensusMaxSleepTimeMs(ratisConfig.getDataMaxSleepTime());
conf.setSchemaRatisConsensusMaxRetryAttempts(ratisConfig.getSchemaMaxRetryAttempts());
conf.setSchemaRatisConsensusInitialSleepTimeMs(ratisConfig.getSchemaInitialSleepTime());
conf.setSchemaRatisConsensusMaxSleepTimeMs(ratisConfig.getSchemaMaxSleepTime());
conf.setDataRatisConsensusPreserveWhenPurge(ratisConfig.getDataPreserveWhenPurge());
conf.setSchemaRatisConsensusPreserveWhenPurge(ratisConfig.getSchemaPreserveWhenPurge());
conf.setRatisFirstElectionTimeoutMinMs(ratisConfig.getFirstElectionTimeoutMin());
conf.setRatisFirstElectionTimeoutMaxMs(ratisConfig.getFirstElectionTimeoutMax());
conf.setSchemaRatisLogMax(ratisConfig.getSchemaRegionRatisLogMax());
conf.setDataRatisLogMax(ratisConfig.getDataRegionRatisLogMax());
conf.setSchemaRatisPeriodicSnapshotInterval(
ratisConfig.getSchemaRegionPeriodicSnapshotInterval());
conf.setDataRatisPeriodicSnapshotInterval(ratisConfig.getDataRegionPeriodicSnapshotInterval());
}
public void loadCQConfig(TCQConfig cqConfig) {
conf.setCqMinEveryIntervalInMs(cqConfig.getCqMinEveryIntervalInMs());
}
public void reclaimConsensusMemory() {
conf.setAllocateMemoryForStorageEngine(
conf.getAllocateMemoryForStorageEngine() + conf.getAllocateMemoryForConsensus());
SystemInfo.getInstance().allocateWriteMemory();
}
private static class IoTDBDescriptorHolder {
private static final IoTDBDescriptor INSTANCE = new IoTDBDescriptor();
private IoTDBDescriptorHolder() {}
}
}