| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.conf; |
| |
| import org.apache.iotdb.common.rpc.thrift.TEndPoint; |
| import org.apache.iotdb.commons.conf.CommonDescriptor; |
| import org.apache.iotdb.commons.conf.IoTDBConstant; |
| import org.apache.iotdb.commons.exception.BadNodeUrlException; |
| import org.apache.iotdb.commons.utils.NodeUrlUtils; |
| import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig; |
| import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig; |
| import org.apache.iotdb.db.conf.directories.DirectoryManager; |
| import org.apache.iotdb.db.engine.StorageEngine; |
| import org.apache.iotdb.db.engine.compaction.constant.CompactionPriority; |
| import org.apache.iotdb.db.engine.compaction.constant.CrossCompactionPerformer; |
| import org.apache.iotdb.db.engine.compaction.constant.CrossCompactionSelector; |
| import org.apache.iotdb.db.engine.compaction.constant.InnerSeqCompactionPerformer; |
| import org.apache.iotdb.db.engine.compaction.constant.InnerSequenceCompactionSelector; |
| import org.apache.iotdb.db.engine.compaction.constant.InnerUnseqCompactionPerformer; |
| import org.apache.iotdb.db.engine.compaction.constant.InnerUnsequenceCompactionSelector; |
| import org.apache.iotdb.db.exception.BadNodeUrlFormatException; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.qp.utils.DatetimeUtils; |
| import org.apache.iotdb.db.service.metrics.MetricService; |
| import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm; |
| import org.apache.iotdb.db.wal.WALManager; |
| import org.apache.iotdb.db.wal.utils.WALMode; |
| import org.apache.iotdb.metrics.config.MetricConfigDescriptor; |
| import org.apache.iotdb.metrics.config.ReloadLevel; |
| import org.apache.iotdb.rpc.RpcTransportFactory; |
| import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.iotdb.tsfile.fileSystem.FSType; |
| import org.apache.iotdb.tsfile.utils.FilePathUtils; |
| |
| import com.google.common.net.InetAddresses; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.InetAddress; |
| import java.net.MalformedURLException; |
| import java.net.URISyntaxException; |
| import java.net.URL; |
| import java.net.UnknownHostException; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.util.Properties; |
| |
| public class IoTDBDescriptor { |
| |
| private static final Logger logger = LoggerFactory.getLogger(IoTDBDescriptor.class); |
| |
| private final CommonDescriptor commonDescriptor = CommonDescriptor.getInstance(); |
| |
| private final IoTDBConfig conf = new IoTDBConfig(); |
| |
| protected IoTDBDescriptor() { |
| loadProps(); |
| } |
| |
| public static IoTDBDescriptor getInstance() { |
| return IoTDBDescriptorHolder.INSTANCE; |
| } |
| |
| public IoTDBConfig getConfig() { |
| return conf; |
| } |
| |
| /** |
| * get props url location |
| * |
| * @return url object if location exit, otherwise null. |
| */ |
| public URL getPropsUrl() { |
| // Check if a config-directory was specified first. |
| String urlString = System.getProperty(IoTDBConstant.IOTDB_CONF, null); |
| // If it wasn't, check if a home directory was provided (This usually contains a config) |
| if (urlString == null) { |
| urlString = System.getProperty(IoTDBConstant.IOTDB_HOME, null); |
| if (urlString != null) { |
| urlString = |
| urlString + File.separatorChar + "conf" + File.separatorChar + IoTDBConfig.CONFIG_NAME; |
| } else { |
| // If this too wasn't provided, try to find a default config in the root of the classpath. |
| URL uri = IoTDBConfig.class.getResource("/" + IoTDBConfig.CONFIG_NAME); |
| if (uri != null) { |
| return uri; |
| } |
| logger.warn( |
| "Cannot find IOTDB_HOME or IOTDB_CONF environment variable when loading " |
| + "config file {}, use default configuration", |
| IoTDBConfig.CONFIG_NAME); |
| // update all data seriesPath |
| conf.updatePath(); |
| return null; |
| } |
| } |
| // If a config location was provided, but it doesn't end with a properties file, |
| // append the default location. |
| else if (!urlString.endsWith(".properties")) { |
| urlString += (File.separatorChar + IoTDBConfig.CONFIG_NAME); |
| } |
| |
| // If the url doesn't start with "file:" or "classpath:", it's provided as a no path. |
| // So we need to add it to make it a real URL. |
| if (!urlString.startsWith("file:") && !urlString.startsWith("classpath:")) { |
| urlString = "file:" + urlString; |
| } |
| try { |
| return new URL(urlString); |
| } catch (MalformedURLException e) { |
| return null; |
| } |
| } |
| |
| /** |
| * get props url location |
| * |
| * @return url object if location exit, otherwise null. |
| */ |
| public Path getExternalPropsPath() { |
| // Check if a config-directory was specified first. |
| String urlString = System.getProperty(IoTDBConstant.IOTDB_CONF, null); |
| // If it wasn't, check if a home directory was provided (This usually contains a config) |
| if (urlString == null) { |
| urlString = System.getProperty(IoTDBConstant.IOTDB_HOME, null); |
| if (urlString != null) { |
| urlString = |
| urlString |
| + File.separatorChar |
| + "conf" |
| + File.separatorChar |
| + IoTDBConfig.EXTERNAL_CONFIG_NAME; |
| } else { |
| // If this too wasn't provided, try to find a default config in the root of the classpath. |
| URL uri = IoTDBConfig.class.getResource("/" + IoTDBConfig.EXTERNAL_CONFIG_NAME); |
| if (uri != null) { |
| try { |
| return Paths.get(uri.toURI()); |
| } catch (URISyntaxException e) { |
| return null; |
| } |
| } |
| logger.warn( |
| "Cannot find IOTDB_HOME or IOTDB_EXTERNAL_CONF environment variable when loading " |
| + "config file {}, use default configuration", |
| IoTDBConfig.EXTERNAL_CONFIG_NAME); |
| // update all data seriesPath |
| conf.updatePath(); |
| return null; |
| } |
| } |
| // If a config location was provided, but it doesn't end with a properties file, |
| // append the default location. |
| else if (!urlString.endsWith(".properties")) { |
| urlString += (File.separatorChar + IoTDBConfig.EXTERNAL_CONFIG_NAME); |
| } |
| |
| return Paths.get(urlString); |
| } |
| |
| /** load an property file and set TsfileDBConfig variables. */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private void loadProps() { |
| URL url = getPropsUrl(); |
| if (url == null) { |
| logger.warn("Couldn't load the configuration from any of the known sources."); |
| return; |
| } |
| |
| try (InputStream inputStream = url.openStream()) { |
| |
| logger.info("Start to read config file {}", url); |
| Properties properties = new Properties(); |
| properties.load(inputStream); |
| |
| loadProperties(properties); |
| |
| } catch (FileNotFoundException e) { |
| logger.warn("Fail to find config file {}", url, e); |
| } catch (IOException e) { |
| logger.warn("Cannot load config file, use default configuration", e); |
| } catch (Exception e) { |
| logger.warn("Incorrect format in config file, use default configuration", e); |
| } finally { |
| // update all data seriesPath |
| conf.updatePath(); |
| commonDescriptor.getConfig().updatePath(System.getProperty(IoTDBConstant.IOTDB_HOME, null)); |
| MetricConfigDescriptor.getInstance() |
| .getMetricConfig() |
| .updateRpcInstance(conf.getRpcAddress(), conf.getRpcPort()); |
| } |
| } |
| |
| public void loadProperties(Properties properties) { |
| |
| conf.setRpcAddress(properties.getProperty(IoTDBConstant.RPC_ADDRESS, conf.getRpcAddress())); |
| |
| // TODO: Use FQDN to identify our nodes afterwards |
| try { |
| replaceHostnameWithIP(); |
| } catch (Exception e) { |
| logger.info(String.format("replace hostname with ip failed, %s", e.getMessage())); |
| } |
| |
| conf.setRpcThriftCompressionEnable( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "rpc_thrift_compression_enable", |
| Boolean.toString(conf.isRpcThriftCompressionEnable())))); |
| |
| conf.setRpcAdvancedCompressionEnable( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "rpc_advanced_compression_enable", |
| Boolean.toString(conf.isRpcAdvancedCompressionEnable())))); |
| |
| conf.setConnectionTimeoutInMS( |
| Integer.parseInt( |
| properties.getProperty( |
| "connection_timeout_ms", String.valueOf(conf.getConnectionTimeoutInMS())))); |
| |
| conf.setMaxConnectionForInternalService( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_connection_for_internal_service", |
| String.valueOf(conf.getMaxConnectionForInternalService())))); |
| |
| conf.setCoreConnectionForInternalService( |
| Integer.parseInt( |
| properties.getProperty( |
| "core_connection_for_internal_service", |
| String.valueOf(conf.getCoreConnectionForInternalService())))); |
| |
| conf.setSelectorNumOfClientManager( |
| Integer.parseInt( |
| properties.getProperty( |
| "selector_thread_nums_of_client_manager", |
| String.valueOf(conf.getSelectorNumOfClientManager())))); |
| |
| conf.setRpcPort( |
| Integer.parseInt( |
| properties.getProperty(IoTDBConstant.RPC_PORT, Integer.toString(conf.getRpcPort())))); |
| |
| conf.setEnableInfluxDBRpcService( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_influxdb_rpc_service", |
| Boolean.toString(conf.isEnableInfluxDBRpcService())))); |
| |
| conf.setInfluxDBRpcPort( |
| Integer.parseInt( |
| properties.getProperty( |
| "influxdb_rpc_port", Integer.toString(conf.getInfluxDBRpcPort())))); |
| |
| conf.setTimestampPrecision( |
| properties.getProperty("timestamp_precision", conf.getTimestampPrecision())); |
| |
| conf.setBufferedArraysMemoryProportion( |
| Double.parseDouble( |
| properties.getProperty( |
| "buffered_arrays_memory_proportion", |
| Double.toString(conf.getBufferedArraysMemoryProportion())))); |
| |
| conf.setFlushProportion( |
| Double.parseDouble( |
| properties.getProperty( |
| "flush_proportion", Double.toString(conf.getFlushProportion())))); |
| |
| conf.setRejectProportion( |
| Double.parseDouble( |
| properties.getProperty( |
| "reject_proportion", Double.toString(conf.getRejectProportion())))); |
| |
| conf.setStorageGroupSizeReportThreshold( |
| Long.parseLong( |
| properties.getProperty( |
| "storage_group_report_threshold", |
| Long.toString(conf.getStorageGroupSizeReportThreshold())))); |
| |
| conf.setMetaDataCacheEnable( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "meta_data_cache_enable", Boolean.toString(conf.isMetaDataCacheEnable())))); |
| |
| initMemoryAllocate(properties); |
| |
| loadWALProps(properties); |
| |
| String systemDir = properties.getProperty("system_dir"); |
| if (systemDir == null) { |
| systemDir = properties.getProperty("base_dir"); |
| if (systemDir != null) { |
| systemDir = FilePathUtils.regularizePath(systemDir) + IoTDBConstant.SYSTEM_FOLDER_NAME; |
| } else { |
| systemDir = conf.getSystemDir(); |
| } |
| } |
| conf.setSystemDir(systemDir); |
| |
| conf.setSchemaDir( |
| FilePathUtils.regularizePath(conf.getSystemDir()) + IoTDBConstant.SCHEMA_FOLDER_NAME); |
| |
| conf.setQueryDir( |
| FilePathUtils.regularizePath(conf.getSystemDir() + IoTDBConstant.QUERY_FOLDER_NAME)); |
| |
| conf.setTracingDir(properties.getProperty("tracing_dir", conf.getTracingDir())); |
| |
| conf.setDataDirs(properties.getProperty("data_dirs", conf.getDataDirs()[0]).split(",")); |
| |
| conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir())); |
| |
| int mlogBufferSize = |
| Integer.parseInt( |
| properties.getProperty("mlog_buffer_size", Integer.toString(conf.getMlogBufferSize()))); |
| if (mlogBufferSize > 0) { |
| conf.setMlogBufferSize(mlogBufferSize); |
| } |
| |
| long forceMlogPeriodInMs = |
| Long.parseLong( |
| properties.getProperty( |
| "sync_mlog_period_in_ms", Long.toString(conf.getSyncMlogPeriodInMs()))); |
| if (forceMlogPeriodInMs > 0) { |
| conf.setSyncMlogPeriodInMs(forceMlogPeriodInMs); |
| } |
| |
| String oldMultiDirStrategyClassName = conf.getMultiDirStrategyClassName(); |
| conf.setMultiDirStrategyClassName( |
| properties.getProperty("multi_dir_strategy", conf.getMultiDirStrategyClassName())); |
| try { |
| conf.checkMultiDirStrategyClassName(); |
| } catch (Exception e) { |
| conf.setMultiDirStrategyClassName(oldMultiDirStrategyClassName); |
| throw e; |
| } |
| |
| conf.setBatchSize( |
| Integer.parseInt( |
| properties.getProperty("batch_size", Integer.toString(conf.getBatchSize())))); |
| |
| conf.setEnableMemControl( |
| (Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_mem_control", Boolean.toString(conf.isEnableMemControl()))))); |
| logger.info("IoTDB enable memory control: {}", conf.isEnableMemControl()); |
| |
| long seqTsFileSize = |
| Long.parseLong( |
| properties |
| .getProperty("seq_tsfile_size", Long.toString(conf.getSeqTsFileSize())) |
| .trim()); |
| if (seqTsFileSize >= 0) { |
| conf.setSeqTsFileSize(seqTsFileSize); |
| } |
| |
| long unSeqTsFileSize = |
| Long.parseLong( |
| properties |
| .getProperty("unseq_tsfile_size", Long.toString(conf.getUnSeqTsFileSize())) |
| .trim()); |
| if (unSeqTsFileSize >= 0) { |
| conf.setUnSeqTsFileSize(unSeqTsFileSize); |
| } |
| |
| long memTableSizeThreshold = |
| Long.parseLong( |
| properties |
| .getProperty( |
| "memtable_size_threshold", Long.toString(conf.getMemtableSizeThreshold())) |
| .trim()); |
| if (memTableSizeThreshold > 0) { |
| conf.setMemtableSizeThreshold(memTableSizeThreshold); |
| } |
| |
| conf.setTvListSortAlgorithm( |
| TVListSortAlgorithm.valueOf( |
| properties.getProperty( |
| "tvlist_sort_algorithm", conf.getTvListSortAlgorithm().toString()))); |
| |
| conf.setAvgSeriesPointNumberThreshold( |
| Integer.parseInt( |
| properties.getProperty( |
| "avg_series_point_number_threshold", |
| Integer.toString(conf.getAvgSeriesPointNumberThreshold())))); |
| |
| conf.setMaxChunkRawSizeThreshold( |
| Long.parseLong( |
| properties.getProperty( |
| "max_chunk_raw_size_threshold", |
| Long.toString(conf.getMaxChunkRawSizeThreshold())))); |
| |
| conf.setCheckPeriodWhenInsertBlocked( |
| Integer.parseInt( |
| properties.getProperty( |
| "check_period_when_insert_blocked", |
| Integer.toString(conf.getCheckPeriodWhenInsertBlocked())))); |
| |
| conf.setMaxWaitingTimeWhenInsertBlocked( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_waiting_time_when_insert_blocked", |
| Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked())))); |
| |
| conf.setIoTaskQueueSizeForFlushing( |
| Integer.parseInt( |
| properties.getProperty( |
| "io_task_queue_size_for_flushing", |
| Integer.toString(conf.getIoTaskQueueSizeForFlushing())))); |
| |
| conf.setCompactionScheduleIntervalInMs( |
| Long.parseLong( |
| properties.getProperty( |
| "compaction_schedule_interval_in_ms", |
| Long.toString(conf.getCompactionScheduleIntervalInMs())))); |
| |
| conf.setCompactionSubmissionIntervalInMs( |
| Long.parseLong( |
| properties.getProperty( |
| "compaction_submission_interval_in_ms", |
| Long.toString(conf.getCompactionSubmissionIntervalInMs())))); |
| |
| conf.setEnableCrossSpaceCompaction( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_cross_space_compaction", |
| Boolean.toString(conf.isEnableCrossSpaceCompaction())))); |
| |
| conf.setEnableSeqSpaceCompaction( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_seq_space_compaction", |
| Boolean.toString(conf.isEnableSeqSpaceCompaction())))); |
| |
| conf.setEnableUnseqSpaceCompaction( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_unseq_space_compaction", |
| Boolean.toString(conf.isEnableUnseqSpaceCompaction())))); |
| |
| conf.setCrossCompactionSelector( |
| CrossCompactionSelector.getCrossCompactionSelector( |
| properties.getProperty( |
| "cross_selector", conf.getCrossCompactionSelector().toString()))); |
| |
| conf.setInnerSequenceCompactionSelector( |
| InnerSequenceCompactionSelector.getInnerSequenceCompactionSelector( |
| properties.getProperty( |
| "inner_seq_selector", conf.getInnerSequenceCompactionSelector().toString()))); |
| |
| conf.setInnerUnsequenceCompactionSelector( |
| InnerUnsequenceCompactionSelector.getInnerUnsequenceCompactionSelector( |
| properties.getProperty( |
| "inner_unseq_selector", conf.getInnerUnsequenceCompactionSelector().toString()))); |
| |
| conf.setInnerSeqCompactionPerformer( |
| InnerSeqCompactionPerformer.getInnerSeqCompactionPerformer( |
| properties.getProperty( |
| "inner_seq_performer", conf.getInnerSeqCompactionPerformer().toString()))); |
| |
| conf.setInnerUnseqCompactionPerformer( |
| InnerUnseqCompactionPerformer.getInnerUnseqCompactionPerformer( |
| properties.getProperty( |
| "inner_unseq_performer", conf.getInnerUnseqCompactionPerformer().toString()))); |
| |
| conf.setCrossCompactionPerformer( |
| CrossCompactionPerformer.getCrossCompactionPerformer( |
| properties.getProperty( |
| "cross_performer", conf.getCrossCompactionPerformer().toString()))); |
| |
| conf.setCompactionPriority( |
| CompactionPriority.valueOf( |
| properties.getProperty( |
| "compaction_priority", conf.getCompactionPriority().toString()))); |
| |
| int subtaskNum = |
| Integer.parseInt( |
| properties.getProperty( |
| "sub_compaction_thread_num", Integer.toString(conf.getSubCompactionTaskNum()))); |
| subtaskNum = subtaskNum <= 0 ? 1 : subtaskNum; |
| conf.setSubCompactionTaskNum(subtaskNum); |
| |
| conf.setQueryTimeoutThreshold( |
| Long.parseLong( |
| properties.getProperty( |
| "query_timeout_threshold", Long.toString(conf.getQueryTimeoutThreshold())))); |
| |
| conf.setSessionTimeoutThreshold( |
| Integer.parseInt( |
| properties.getProperty( |
| "session_timeout_threshold", Integer.toString(conf.getSessionTimeoutThreshold())))); |
| conf.setMaxNumberOfSyncFileRetry( |
| Integer.parseInt( |
| properties |
| .getProperty( |
| "max_number_of_sync_file_retry", |
| Integer.toString(conf.getMaxNumberOfSyncFileRetry())) |
| .trim())); |
| |
| conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList())); |
| |
| conf.setConcurrentFlushThread( |
| Integer.parseInt( |
| properties.getProperty( |
| "concurrent_flush_thread", Integer.toString(conf.getConcurrentFlushThread())))); |
| |
| if (conf.getConcurrentFlushThread() <= 0) { |
| conf.setConcurrentFlushThread(Runtime.getRuntime().availableProcessors()); |
| } |
| |
| // start: index parameter setting |
| conf.setIndexRootFolder(properties.getProperty("index_root_dir", conf.getIndexRootFolder())); |
| |
| conf.setEnableIndex( |
| Boolean.parseBoolean( |
| properties.getProperty("enable_index", Boolean.toString(conf.isEnableIndex())))); |
| |
| conf.setConcurrentIndexBuildThread( |
| Integer.parseInt( |
| properties.getProperty( |
| "concurrent_index_build_thread", |
| Integer.toString(conf.getConcurrentIndexBuildThread())))); |
| if (conf.getConcurrentIndexBuildThread() <= 0) { |
| conf.setConcurrentIndexBuildThread(Runtime.getRuntime().availableProcessors()); |
| } |
| |
| conf.setDefaultIndexWindowRange( |
| Integer.parseInt( |
| properties.getProperty( |
| "default_index_window_range", |
| Integer.toString(conf.getDefaultIndexWindowRange())))); |
| |
| conf.setConcurrentQueryThread( |
| Integer.parseInt( |
| properties.getProperty( |
| "concurrent_query_thread", Integer.toString(conf.getConcurrentQueryThread())))); |
| |
| if (conf.getConcurrentQueryThread() <= 0) { |
| conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors()); |
| } |
| |
| conf.setMaxAllowedConcurrentQueries( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_allowed_concurrent_queries", |
| Integer.toString(conf.getMaxAllowedConcurrentQueries())))); |
| |
| if (conf.getMaxAllowedConcurrentQueries() <= 0) { |
| conf.setMaxAllowedConcurrentQueries(1000); |
| } |
| |
| conf.setConcurrentSubRawQueryThread( |
| Integer.parseInt( |
| properties.getProperty( |
| "concurrent_sub_rawQuery_thread", |
| Integer.toString(conf.getConcurrentSubRawQueryThread())))); |
| |
| if (conf.getConcurrentSubRawQueryThread() <= 0) { |
| conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors()); |
| } |
| |
| conf.setRawQueryBlockingQueueCapacity( |
| Integer.parseInt( |
| properties.getProperty( |
| "raw_query_blocking_queue_capacity", |
| Integer.toString(conf.getRawQueryBlockingQueueCapacity())))); |
| |
| conf.setSchemaRegionDeviceNodeCacheSize( |
| Integer.parseInt( |
| properties |
| .getProperty( |
| "schema_region_device_node_cache_size", |
| Integer.toString(conf.getSchemaRegionDeviceNodeCacheSize())) |
| .trim())); |
| |
| conf.setmRemoteSchemaCacheSize( |
| Integer.parseInt( |
| properties |
| .getProperty( |
| "remote_schema_cache_size", Integer.toString(conf.getmRemoteSchemaCacheSize())) |
| .trim())); |
| |
| conf.setLanguageVersion( |
| properties.getProperty("language_version", conf.getLanguageVersion()).trim()); |
| |
| if (properties.containsKey("chunk_buffer_pool_enable")) { |
| conf.setChunkBufferPoolEnable( |
| Boolean.parseBoolean(properties.getProperty("chunk_buffer_pool_enable"))); |
| } |
| |
| conf.setEnableExternalSort( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_external_sort", Boolean.toString(conf.isEnableExternalSort())))); |
| conf.setExternalSortThreshold( |
| Integer.parseInt( |
| properties.getProperty( |
| "external_sort_threshold", Integer.toString(conf.getExternalSortThreshold())))); |
| conf.setUpgradeThreadNum( |
| Integer.parseInt( |
| properties.getProperty( |
| "upgrade_thread_num", Integer.toString(conf.getUpgradeThreadNum())))); |
| conf.setCrossCompactionFileSelectionTimeBudget( |
| Long.parseLong( |
| properties.getProperty( |
| "cross_compaction_file_selection_time_budget", |
| Long.toString(conf.getCrossCompactionFileSelectionTimeBudget())))); |
| conf.setMergeIntervalSec( |
| Long.parseLong( |
| properties.getProperty( |
| "merge_interval_sec", Long.toString(conf.getMergeIntervalSec())))); |
| conf.setConcurrentCompactionThread( |
| Integer.parseInt( |
| properties.getProperty( |
| "concurrent_compaction_thread", |
| Integer.toString(conf.getConcurrentCompactionThread())))); |
| conf.setChunkMetadataSizeProportionInCompaction( |
| Double.parseDouble( |
| properties.getProperty( |
| "chunk_metadata_size_proportion_in_compaction", |
| Double.toString(conf.getChunkMetadataSizeProportionInCompaction())))); |
| conf.setTargetCompactionFileSize( |
| Long.parseLong( |
| properties.getProperty( |
| "target_compaction_file_size", Long.toString(conf.getTargetCompactionFileSize())))); |
| conf.setTargetChunkSize( |
| Long.parseLong( |
| properties.getProperty("target_chunk_size", Long.toString(conf.getTargetChunkSize())))); |
| conf.setTargetChunkPointNum( |
| Long.parseLong( |
| properties.getProperty( |
| "target_chunk_point_num", Long.toString(conf.getTargetChunkPointNum())))); |
| conf.setChunkPointNumLowerBoundInCompaction( |
| Long.parseLong( |
| properties.getProperty( |
| "chunk_size_lower_bound_in_compaction", |
| Long.toString(conf.getChunkPointNumLowerBoundInCompaction())))); |
| conf.setChunkSizeLowerBoundInCompaction( |
| Long.parseLong( |
| properties.getProperty( |
| "chunk_size_lower_bound_in_compaction", |
| Long.toString(conf.getChunkSizeLowerBoundInCompaction())))); |
| conf.setMaxInnerCompactionCandidateFileNum( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_inner_compaction_candidate_file_num", |
| Integer.toString(conf.getMaxInnerCompactionCandidateFileNum())))); |
| conf.setMaxCrossCompactionCandidateFileNum( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_cross_compaction_candidate_file_num", |
| Integer.toString(conf.getMaxCrossCompactionCandidateFileNum())))); |
| conf.setMaxCrossCompactionCandidateFileSize( |
| Long.parseLong( |
| properties.getProperty( |
| "max_cross_compaction_candidate_file_size", |
| Long.toString(conf.getMaxCrossCompactionCandidateFileSize())))); |
| |
| conf.setCompactionWriteThroughputMbPerSec( |
| Integer.parseInt( |
| properties.getProperty( |
| "compaction_write_throughput_mb_per_sec", |
| Integer.toString(conf.getCompactionWriteThroughputMbPerSec())))); |
| |
| conf.setEnablePartialInsert( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_partial_insert", String.valueOf(conf.isEnablePartialInsert())))); |
| |
| int rpcSelectorThreadNum = |
| Integer.parseInt( |
| properties.getProperty( |
| "rpc_selector_thread_num", |
| Integer.toString(conf.getRpcSelectorThreadNum()).trim())); |
| if (rpcSelectorThreadNum <= 0) { |
| rpcSelectorThreadNum = 1; |
| } |
| |
| conf.setRpcSelectorThreadNum(rpcSelectorThreadNum); |
| |
| int minConcurrentClientNum = |
| Integer.parseInt( |
| properties.getProperty( |
| "rpc_min_concurrent_client_num", |
| Integer.toString(conf.getRpcMinConcurrentClientNum()).trim())); |
| if (minConcurrentClientNum <= 0) { |
| minConcurrentClientNum = Runtime.getRuntime().availableProcessors(); |
| } |
| |
| conf.setRpcMinConcurrentClientNum(minConcurrentClientNum); |
| |
| int maxConcurrentClientNum = |
| Integer.parseInt( |
| properties.getProperty( |
| "rpc_max_concurrent_client_num", |
| Integer.toString(conf.getRpcMaxConcurrentClientNum()).trim())); |
| if (maxConcurrentClientNum <= 0) { |
| maxConcurrentClientNum = 65535; |
| } |
| |
| conf.setRpcMaxConcurrentClientNum(maxConcurrentClientNum); |
| |
| conf.setEnableWatermark( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "watermark_module_opened", Boolean.toString(conf.isEnableWatermark()).trim()))); |
| conf.setWatermarkSecretKey( |
| properties.getProperty("watermark_secret_key", conf.getWatermarkSecretKey())); |
| conf.setWatermarkBitString( |
| properties.getProperty("watermark_bit_string", conf.getWatermarkBitString())); |
| conf.setWatermarkMethod(properties.getProperty("watermark_method", conf.getWatermarkMethod())); |
| |
| loadAutoCreateSchemaProps(properties); |
| |
| conf.setTsFileStorageFs( |
| properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().toString())); |
| conf.setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath())); |
| conf.setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath())); |
| conf.setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(",")); |
| conf.setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort())); |
| conf.setDfsNameServices(properties.getProperty("dfs_nameservices", conf.getDfsNameServices())); |
| conf.setDfsHaNamenodes( |
| properties.getProperty("dfs_ha_namenodes", conf.getRawDfsHaNamenodes()).split(",")); |
| conf.setDfsHaAutomaticFailoverEnabled( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "dfs_ha_automatic_failover_enabled", |
| String.valueOf(conf.isDfsHaAutomaticFailoverEnabled())))); |
| conf.setDfsClientFailoverProxyProvider( |
| properties.getProperty( |
| "dfs_client_failover_proxy_provider", conf.getDfsClientFailoverProxyProvider())); |
| conf.setUseKerberos( |
| Boolean.parseBoolean( |
| properties.getProperty("hdfs_use_kerberos", String.valueOf(conf.isUseKerberos())))); |
| conf.setKerberosKeytabFilePath( |
| properties.getProperty("kerberos_keytab_file_path", conf.getKerberosKeytabFilePath())); |
| conf.setKerberosPrincipal( |
| properties.getProperty("kerberos_principal", conf.getKerberosPrincipal())); |
| |
| // the num of memtables in each storage group |
| conf.setConcurrentWritingTimePartition( |
| Integer.parseInt( |
| properties.getProperty( |
| "concurrent_writing_time_partition", |
| String.valueOf(conf.getConcurrentWritingTimePartition())))); |
| |
| // the default fill interval in LinearFill and PreviousFill |
| conf.setDefaultFillInterval( |
| Integer.parseInt( |
| properties.getProperty( |
| "default_fill_interval", String.valueOf(conf.getDefaultFillInterval())))); |
| |
| conf.setTagAttributeTotalSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "tag_attribute_total_size", String.valueOf(conf.getTagAttributeTotalSize())))); |
| |
| conf.setTagAttributeFlushInterval( |
| Integer.parseInt( |
| properties.getProperty( |
| "tag_attribute_flush_interval", |
| String.valueOf(conf.getTagAttributeFlushInterval())))); |
| |
| conf.setPrimitiveArraySize( |
| (Integer.parseInt( |
| properties.getProperty( |
| "primitive_array_size", String.valueOf(conf.getPrimitiveArraySize()))))); |
| |
| conf.setThriftMaxFrameSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "thrift_max_frame_size", String.valueOf(conf.getThriftMaxFrameSize())))); |
| |
| if (conf.getThriftMaxFrameSize() < IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2) { |
| conf.setThriftMaxFrameSize(IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2); |
| } |
| |
| conf.setThriftDefaultBufferSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "thrift_init_buffer_size", String.valueOf(conf.getThriftDefaultBufferSize())))); |
| |
| conf.setFrequencyIntervalInMinute( |
| Integer.parseInt( |
| properties.getProperty( |
| "frequency_interval_in_minute", |
| String.valueOf(conf.getFrequencyIntervalInMinute())))); |
| |
| conf.setSlowQueryThreshold( |
| Long.parseLong( |
| properties.getProperty( |
| "slow_query_threshold", String.valueOf(conf.getSlowQueryThreshold())))); |
| |
| conf.setDataRegionNum( |
| Integer.parseInt( |
| properties.getProperty("data_region_num", String.valueOf(conf.getDataRegionNum())))); |
| |
| conf.setRecoveryLogIntervalInMs( |
| Long.parseLong( |
| properties.getProperty( |
| "recovery_log_interval_in_ms", String.valueOf(conf.getRecoveryLogIntervalInMs())))); |
| |
| conf.setEnableDiscardOutOfOrderData( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_discard_out_of_order_data", |
| Boolean.toString(conf.isEnableDiscardOutOfOrderData())))); |
| |
| conf.setConcurrentWindowEvaluationThread( |
| Integer.parseInt( |
| properties.getProperty( |
| "concurrent_window_evaluation_thread", |
| Integer.toString(conf.getConcurrentWindowEvaluationThread())))); |
| if (conf.getConcurrentWindowEvaluationThread() <= 0) { |
| conf.setConcurrentWindowEvaluationThread(Runtime.getRuntime().availableProcessors()); |
| } |
| |
| conf.setMaxPendingWindowEvaluationTasks( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_pending_window_evaluation_tasks", |
| Integer.toString(conf.getMaxPendingWindowEvaluationTasks())))); |
| if (conf.getMaxPendingWindowEvaluationTasks() <= 0) { |
| conf.setMaxPendingWindowEvaluationTasks(64); |
| } |
| |
| // id table related configuration |
| conf.setDeviceIDTransformationMethod( |
| properties.getProperty( |
| "device_id_transformation_method", conf.getDeviceIDTransformationMethod())); |
| |
| conf.setEnableIDTable( |
| Boolean.parseBoolean( |
| properties.getProperty("enable_id_table", String.valueOf(conf.isEnableIDTable())))); |
| |
| conf.setEnableIDTableLogFile( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_id_table_log_file", String.valueOf(conf.isEnableIDTableLogFile())))); |
| |
| conf.setSchemaEngineMode( |
| properties.getProperty("schema_engine_mode", String.valueOf(conf.getSchemaEngineMode()))); |
| |
| conf.setEnableLastCache( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_last_cache", Boolean.toString(conf.isLastCacheEnabled())))); |
| |
| if (conf.getSchemaEngineMode().equals("Rocksdb_based")) { |
| conf.setEnableLastCache(false); |
| } |
| |
| conf.setCachedMNodeSizeInSchemaFileMode( |
| Integer.parseInt( |
| properties.getProperty( |
| "cached_mnode_size_in_schema_file_mode", |
| String.valueOf(conf.getCachedMNodeSizeInSchemaFileMode())))); |
| |
| conf.setMinimumSegmentInSchemaFile( |
| Short.parseShort( |
| properties.getProperty( |
| "minimum_schema_file_segment_in_bytes", |
| String.valueOf(conf.getMinimumSegmentInSchemaFile())))); |
| |
| conf.setPageCacheSizeInSchemaFile( |
| Short.parseShort( |
| properties.getProperty( |
| "page_cache_in_schema_file", String.valueOf(conf.getPageCacheSizeInSchemaFile())))); |
| |
| // mqtt |
| loadMqttProps(properties); |
| |
| conf.setEnablePartition( |
| Boolean.parseBoolean( |
| properties.getProperty("enable_partition", String.valueOf(conf.isEnablePartition())))); |
| |
| conf.setPartitionInterval( |
| Long.parseLong( |
| properties.getProperty( |
| "partition_interval", String.valueOf(conf.getPartitionInterval())))); |
| |
| conf.setSelectIntoInsertTabletPlanRowLimit( |
| Integer.parseInt( |
| properties.getProperty( |
| "select_into_insert_tablet_plan_row_limit", |
| String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit())))); |
| |
| conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim()); |
| |
| conf.setInsertMultiTabletEnableMultithreadingColumnThreshold( |
| Integer.parseInt( |
| properties.getProperty( |
| "insert_multi_tablet_enable_multithreading_column_threshold", |
| String.valueOf(conf.getInsertMultiTabletEnableMultithreadingColumnThreshold())))); |
| |
| // At the same time, set TSFileConfig |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setTSFileStorageFs( |
| FSType.valueOf( |
| properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().name()))); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath())); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath())); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(",")); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort())); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setDfsNameServices(properties.getProperty("dfs_nameservices", conf.getDfsNameServices())); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setDfsHaNamenodes( |
| properties.getProperty("dfs_ha_namenodes", conf.getRawDfsHaNamenodes()).split(",")); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setDfsHaAutomaticFailoverEnabled( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "dfs_ha_automatic_failover_enabled", |
| String.valueOf(conf.isDfsHaAutomaticFailoverEnabled())))); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setDfsClientFailoverProxyProvider( |
| properties.getProperty( |
| "dfs_client_failover_proxy_provider", conf.getDfsClientFailoverProxyProvider())); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setUseKerberos( |
| Boolean.parseBoolean( |
| properties.getProperty("hdfs_use_kerberos", String.valueOf(conf.isUseKerberos())))); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setKerberosKeytabFilePath( |
| properties.getProperty("kerberos_keytab_file_path", conf.getKerberosKeytabFilePath())); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setKerberosPrincipal( |
| properties.getProperty("kerberos_principal", conf.getKerberosPrincipal())); |
| TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize()); |
| |
| conf.setCoordinatorReadExecutorSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "coordinator_read_executor_size", |
| Integer.toString(conf.getCoordinatorReadExecutorSize())))); |
| conf.setCoordinatorWriteExecutorSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "coordinator_write_executor_size", |
| Integer.toString(conf.getCoordinatorWriteExecutorSize())))); |
| |
| // commons |
| commonDescriptor.loadCommonProps(properties); |
| commonDescriptor.initCommonConfigDir(conf.getSystemDir()); |
| |
| // timed flush memtable |
| loadTimedService(properties); |
| |
| // set tsfile-format config |
| loadTsFileProps(properties); |
| |
| // make RPCTransportFactory taking effect. |
| RpcTransportFactory.reInit(); |
| |
| // UDF |
| loadUDFProps(properties); |
| |
| // trigger |
| loadTriggerProps(properties); |
| |
| // CQ |
| loadCQProps(properties); |
| |
| // cluster |
| loadClusterProps(properties); |
| |
| // shuffle |
| loadShuffleProps(properties); |
| |
| // author cache |
| loadAuthorCache(properties); |
| } |
| |
| private void loadAuthorCache(Properties properties) { |
| conf.setAuthorCacheSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "author_cache_size", String.valueOf(conf.getAuthorCacheSize())))); |
| conf.setAuthorCacheExpireTime( |
| Integer.parseInt( |
| properties.getProperty( |
| "author_cache_expire_time", String.valueOf(conf.getAuthorCacheExpireTime())))); |
| } |
| |
| // to keep consistent with the cluster module. |
| private void replaceHostnameWithIP() throws UnknownHostException, BadNodeUrlFormatException { |
| boolean isInvalidRpcIp = InetAddresses.isInetAddress(conf.getRpcAddress()); |
| if (!isInvalidRpcIp) { |
| conf.setRpcAddress(InetAddress.getByName(conf.getRpcAddress()).getHostAddress()); |
| } |
| |
| boolean isInvalidInternalIp = InetAddresses.isInetAddress(conf.getInternalAddress()); |
| if (!isInvalidInternalIp) { |
| conf.setInternalAddress(InetAddress.getByName(conf.getInternalAddress()).getHostAddress()); |
| } |
| |
| for (TEndPoint configNode : conf.getTargetConfigNodeList()) { |
| boolean isInvalidNodeIp = InetAddresses.isInetAddress(configNode.ip); |
| if (!isInvalidNodeIp) { |
| String newNodeIP = InetAddress.getByName(configNode.ip).getHostAddress(); |
| configNode.setIp(newNodeIP); |
| } |
| } |
| |
| logger.debug( |
| "after replace, the rpcIP={}, internalIP={}, configNodeUrls={}", |
| conf.getRpcAddress(), |
| conf.getInternalAddress(), |
| conf.getTargetConfigNodeList()); |
| } |
| |
| private void loadWALProps(Properties properties) { |
| conf.setWalMode( |
| WALMode.valueOf((properties.getProperty("wal_mode", conf.getWalMode().toString())))); |
| |
| int maxWalNodesNum = |
| Integer.parseInt( |
| properties.getProperty( |
| "max_wal_nodes_num", Integer.toString(conf.getMaxWalNodesNum()))); |
| if (maxWalNodesNum > 0) { |
| conf.setMaxWalNodesNum(maxWalNodesNum); |
| } |
| |
| int walBufferSize = |
| Integer.parseInt( |
| properties.getProperty( |
| "wal_buffer_size_in_byte", Integer.toString(conf.getWalBufferSize()))); |
| if (walBufferSize > 0) { |
| conf.setWalBufferSize(walBufferSize); |
| } |
| |
| int walBufferEntrySize = |
| Integer.parseInt( |
| properties.getProperty( |
| "wal_buffer_entry_size_in_byte", Integer.toString(conf.getWalBufferEntrySize()))); |
| if (walBufferEntrySize > 0) { |
| conf.setWalBufferEntrySize(walBufferEntrySize); |
| } |
| |
| int walBufferQueueCapacity = |
| Integer.parseInt( |
| properties.getProperty( |
| "wal_buffer_queue_capacity", Integer.toString(conf.getWalBufferQueueCapacity()))); |
| if (walBufferQueueCapacity > 0) { |
| conf.setWalBufferQueueCapacity(walBufferQueueCapacity); |
| } |
| |
| loadWALHotModifiedProps(properties); |
| } |
| |
| private void loadWALHotModifiedProps(Properties properties) { |
| long fsyncWalDelayInMs = |
| Long.parseLong( |
| properties.getProperty( |
| "fsync_wal_delay_in_ms", Long.toString(conf.getFsyncWalDelayInMs()))); |
| if (fsyncWalDelayInMs > 0) { |
| conf.setFsyncWalDelayInMs(fsyncWalDelayInMs); |
| } |
| |
| long walFileSizeThreshold = |
| Long.parseLong( |
| properties.getProperty( |
| "wal_file_size_threshold_in_byte", |
| Long.toString(conf.getWalFileSizeThresholdInByte()))); |
| if (walFileSizeThreshold > 0) { |
| conf.setWalFileSizeThresholdInByte(walFileSizeThreshold); |
| } |
| |
| double walMinEffectiveInfoRatio = |
| Double.parseDouble( |
| properties.getProperty( |
| "wal_min_effective_info_ratio", |
| Double.toString(conf.getWalMinEffectiveInfoRatio()))); |
| if (walMinEffectiveInfoRatio > 0) { |
| conf.setWalMinEffectiveInfoRatio(walMinEffectiveInfoRatio); |
| } |
| |
| long walMemTableSnapshotThreshold = |
| Long.parseLong( |
| properties.getProperty( |
| "wal_memtable_snapshot_threshold_in_byte", |
| Long.toString(conf.getWalMemTableSnapshotThreshold()))); |
| if (walMemTableSnapshotThreshold > 0) { |
| conf.setWalMemTableSnapshotThreshold(walMemTableSnapshotThreshold); |
| } |
| |
| int maxWalMemTableSnapshotNum = |
| Integer.parseInt( |
| properties.getProperty( |
| "max_wal_memtable_snapshot_num", |
| Integer.toString(conf.getMaxWalMemTableSnapshotNum()))); |
| if (maxWalMemTableSnapshotNum > 0) { |
| conf.setMaxWalMemTableSnapshotNum(maxWalMemTableSnapshotNum); |
| } |
| |
| long deleteWalFilesPeriod = |
| Long.parseLong( |
| properties.getProperty( |
| "delete_wal_files_period_in_ms", |
| Long.toString(conf.getDeleteWalFilesPeriodInMs()))); |
| if (deleteWalFilesPeriod > 0) { |
| conf.setDeleteWalFilesPeriodInMs(deleteWalFilesPeriod); |
| } |
| |
| long throttleDownThresholdInByte = |
| Long.parseLong( |
| properties.getProperty( |
| "multi_leader_throttle_threshold_in_byte", |
| Long.toString(conf.getThrottleThreshold()))); |
| if (throttleDownThresholdInByte > 0) { |
| conf.setThrottleThreshold(throttleDownThresholdInByte); |
| } |
| |
| long cacheWindowInMs = |
| Long.parseLong( |
| properties.getProperty( |
| "multi_leader_cache_window_time_in_ms", |
| Long.toString(conf.getCacheWindowTimeInMs()))); |
| if (cacheWindowInMs > 0) { |
| conf.setCacheWindowTimeInMs(cacheWindowInMs); |
| } |
| } |
| |
| private void loadAutoCreateSchemaProps(Properties properties) { |
| conf.setAutoCreateSchemaEnabled( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_auto_create_schema", |
| Boolean.toString(conf.isAutoCreateSchemaEnabled()).trim()))); |
| conf.setBooleanStringInferType( |
| TSDataType.valueOf( |
| properties.getProperty( |
| "boolean_string_infer_type", conf.getBooleanStringInferType().toString()))); |
| conf.setIntegerStringInferType( |
| TSDataType.valueOf( |
| properties.getProperty( |
| "integer_string_infer_type", conf.getIntegerStringInferType().toString()))); |
| conf.setLongStringInferType( |
| TSDataType.valueOf( |
| properties.getProperty( |
| "long_string_infer_type", conf.getLongStringInferType().toString()))); |
| conf.setFloatingStringInferType( |
| TSDataType.valueOf( |
| properties.getProperty( |
| "floating_string_infer_type", conf.getFloatingStringInferType().toString()))); |
| conf.setNanStringInferType( |
| TSDataType.valueOf( |
| properties.getProperty( |
| "nan_string_infer_type", conf.getNanStringInferType().toString()))); |
| conf.setDefaultStorageGroupLevel( |
| Integer.parseInt( |
| properties.getProperty( |
| "default_storage_group_level", |
| Integer.toString(conf.getDefaultStorageGroupLevel())))); |
| conf.setDefaultBooleanEncoding( |
| properties.getProperty( |
| "default_boolean_encoding", conf.getDefaultBooleanEncoding().toString())); |
| conf.setDefaultInt32Encoding( |
| properties.getProperty( |
| "default_int32_encoding", conf.getDefaultInt32Encoding().toString())); |
| conf.setDefaultInt64Encoding( |
| properties.getProperty( |
| "default_int64_encoding", conf.getDefaultInt64Encoding().toString())); |
| conf.setDefaultFloatEncoding( |
| properties.getProperty( |
| "default_float_encoding", conf.getDefaultFloatEncoding().toString())); |
| conf.setDefaultDoubleEncoding( |
| properties.getProperty( |
| "default_double_encoding", conf.getDefaultDoubleEncoding().toString())); |
| conf.setDefaultTextEncoding( |
| properties.getProperty("default_text_encoding", conf.getDefaultTextEncoding().toString())); |
| } |
| |
| private void loadTsFileProps(Properties properties) { |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setGroupSizeInByte( |
| Integer.parseInt( |
| properties.getProperty( |
| "group_size_in_byte", |
| Integer.toString( |
| TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte())))); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setPageSizeInByte( |
| Integer.parseInt( |
| properties.getProperty( |
| "page_size_in_byte", |
| Integer.toString( |
| TSFileDescriptor.getInstance().getConfig().getPageSizeInByte())))); |
| if (TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() |
| > TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte()) { |
| logger.warn("page_size is greater than group size, will set it as the same with group size"); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setPageSizeInByte(TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte()); |
| } |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setMaxNumberOfPointsInPage( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_number_of_points_in_page", |
| Integer.toString( |
| TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage())))); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setMaxStringLength( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_string_length", |
| Integer.toString( |
| TSFileDescriptor.getInstance().getConfig().getMaxStringLength())))); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setBloomFilterErrorRate( |
| Double.parseDouble( |
| properties.getProperty( |
| "bloom_filter_error_rate", |
| Double.toString( |
| TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate())))); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setFloatPrecision( |
| Integer.parseInt( |
| properties.getProperty( |
| "float_precision", |
| Integer.toString( |
| TSFileDescriptor.getInstance().getConfig().getFloatPrecision())))); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setTimeEncoder( |
| properties.getProperty( |
| "time_encoder", TSFileDescriptor.getInstance().getConfig().getTimeEncoder())); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setValueEncoder( |
| properties.getProperty( |
| "value_encoder", TSFileDescriptor.getInstance().getConfig().getValueEncoder())); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setCompressor( |
| properties.getProperty( |
| "compressor", |
| TSFileDescriptor.getInstance().getConfig().getCompressor().toString())); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setMaxDegreeOfIndexNode( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_degree_of_index_node", |
| Integer.toString( |
| TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode())))); |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setMaxTsBlockSizeInBytes( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_tsblock_size_in_bytes", |
| Integer.toString( |
| TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes())))); |
| |
| // min(default_size, maxBytesForQuery) |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setMaxTsBlockSizeInBytes( |
| (int) |
| Math.min( |
| TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(), |
| conf.getMaxBytesPerQuery())); |
| |
| TSFileDescriptor.getInstance() |
| .getConfig() |
| .setMaxTsBlockLineNumber( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_tsblock_line_number", |
| Integer.toString( |
| TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber())))); |
| } |
| |
| // Mqtt related |
| private void loadMqttProps(Properties properties) { |
| conf.setMqttDir(properties.getProperty("mqtt_root_dir", conf.getMqttDir())); |
| |
| if (properties.getProperty(IoTDBConstant.MQTT_HOST_NAME) != null) { |
| conf.setMqttHost(properties.getProperty(IoTDBConstant.MQTT_HOST_NAME)); |
| } |
| |
| if (properties.getProperty(IoTDBConstant.MQTT_PORT_NAME) != null) { |
| conf.setMqttPort(Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_PORT_NAME))); |
| } |
| |
| if (properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME) != null) { |
| conf.setMqttHandlerPoolSize( |
| Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME))); |
| } |
| |
| if (properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME) != null) { |
| conf.setMqttPayloadFormatter( |
| properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME)); |
| } |
| |
| if (properties.getProperty(IoTDBConstant.ENABLE_MQTT) != null) { |
| conf.setEnableMQTTService( |
| Boolean.parseBoolean(properties.getProperty(IoTDBConstant.ENABLE_MQTT))); |
| } |
| |
| if (properties.getProperty(IoTDBConstant.MQTT_MAX_MESSAGE_SIZE) != null) { |
| conf.setMqttMaxMessageSize( |
| Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_MAX_MESSAGE_SIZE))); |
| } |
| } |
| |
| private void loadExternalLibProps(Properties properties) { |
| |
| conf.setExternalPropertiesLoaderDir( |
| properties.getProperty( |
| "external_properties_loader_dir", conf.getExternalPropertiesLoaderDir())); |
| |
| conf.setExternalLimiterDir( |
| properties.getProperty("external_limiter_dir", conf.getExternalLimiterDir())); |
| } |
| |
| // timed flush memtable |
| private void loadTimedService(Properties properties) { |
| conf.setEnableTimedFlushSeqMemtable( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_timed_flush_seq_memtable", |
| Boolean.toString(conf.isEnableTimedFlushSeqMemtable())))); |
| |
| long seqMemTableFlushInterval = |
| Long.parseLong( |
| properties |
| .getProperty( |
| "seq_memtable_flush_interval_in_ms", |
| Long.toString(conf.getSeqMemtableFlushInterval())) |
| .trim()); |
| if (seqMemTableFlushInterval > 0) { |
| conf.setSeqMemtableFlushInterval(seqMemTableFlushInterval); |
| } |
| |
| long seqMemTableFlushCheckInterval = |
| Long.parseLong( |
| properties |
| .getProperty( |
| "seq_memtable_flush_check_interval_in_ms", |
| Long.toString(conf.getSeqMemtableFlushCheckInterval())) |
| .trim()); |
| if (seqMemTableFlushCheckInterval > 0) { |
| conf.setSeqMemtableFlushCheckInterval(seqMemTableFlushCheckInterval); |
| } |
| |
| conf.setEnableTimedFlushUnseqMemtable( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_timed_flush_unseq_memtable", |
| Boolean.toString(conf.isEnableTimedFlushUnseqMemtable())))); |
| |
| long unseqMemTableFlushInterval = |
| Long.parseLong( |
| properties |
| .getProperty( |
| "unseq_memtable_flush_interval_in_ms", |
| Long.toString(conf.getUnseqMemtableFlushInterval())) |
| .trim()); |
| if (unseqMemTableFlushInterval > 0) { |
| conf.setUnseqMemtableFlushInterval(unseqMemTableFlushInterval); |
| } |
| |
| long unseqMemTableFlushCheckInterval = |
| Long.parseLong( |
| properties |
| .getProperty( |
| "unseq_memtable_flush_check_interval_in_ms", |
| Long.toString(conf.getUnseqMemtableFlushCheckInterval())) |
| .trim()); |
| if (unseqMemTableFlushCheckInterval > 0) { |
| conf.setUnseqMemtableFlushCheckInterval(unseqMemTableFlushCheckInterval); |
| } |
| } |
| |
| public void loadHotModifiedProps(Properties properties) throws QueryProcessException { |
| try { |
| // update data dirs |
| String dataDirs = properties.getProperty("data_dirs", null); |
| if (dataDirs != null) { |
| conf.reloadDataDirs(dataDirs.split(",")); |
| } |
| |
| // update dir strategy, must update after data dirs |
| String multiDirStrategyClassName = properties.getProperty("multi_dir_strategy", null); |
| if (multiDirStrategyClassName != null |
| && !multiDirStrategyClassName.equals(conf.getMultiDirStrategyClassName())) { |
| conf.setMultiDirStrategyClassName(multiDirStrategyClassName); |
| conf.confirmMultiDirStrategy(); |
| DirectoryManager.getInstance().updateDirectoryStrategy(); |
| } |
| |
| // update timed flush & close conf |
| loadTimedService(properties); |
| StorageEngine.getInstance().rebootTimedService(); |
| |
| long seqTsFileSize = |
| Long.parseLong( |
| properties |
| .getProperty("seq_tsfile_size", Long.toString(conf.getSeqTsFileSize())) |
| .trim()); |
| if (seqTsFileSize >= 0) { |
| conf.setSeqTsFileSize(seqTsFileSize); |
| } |
| |
| long unSeqTsFileSize = |
| Long.parseLong( |
| properties |
| .getProperty("unseq_tsfile_size", Long.toString(conf.getUnSeqTsFileSize())) |
| .trim()); |
| if (unSeqTsFileSize >= 0) { |
| conf.setUnSeqTsFileSize(unSeqTsFileSize); |
| } |
| |
| long memTableSizeThreshold = |
| Long.parseLong( |
| properties |
| .getProperty( |
| "memtable_size_threshold", Long.toString(conf.getMemtableSizeThreshold())) |
| .trim()); |
| if (memTableSizeThreshold > 0) { |
| conf.setMemtableSizeThreshold(memTableSizeThreshold); |
| } |
| |
| // update params of creating schema automatically |
| loadAutoCreateSchemaProps(properties); |
| |
| // update tsfile-format config |
| loadTsFileProps(properties); |
| |
| conf.setChunkMetadataSizeProportionInWrite( |
| Double.parseDouble( |
| properties.getProperty( |
| "chunk_metadata_size_proportion_in_write", |
| Double.toString(conf.getChunkMetadataSizeProportionInWrite())))); |
| |
| // update max_deduplicated_path_num |
| conf.setMaxQueryDeduplicatedPathNum( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_deduplicated_path_num", |
| Integer.toString(conf.getMaxQueryDeduplicatedPathNum())))); |
| // update frequency_interval_in_minute |
| conf.setFrequencyIntervalInMinute( |
| Integer.parseInt( |
| properties.getProperty( |
| "frequency_interval_in_minute", |
| Integer.toString(conf.getFrequencyIntervalInMinute())))); |
| // update slow_query_threshold |
| conf.setSlowQueryThreshold( |
| Long.parseLong( |
| properties.getProperty( |
| "slow_query_threshold", Long.toString(conf.getSlowQueryThreshold())))); |
| // update merge_write_throughput_mb_per_sec |
| conf.setCompactionWriteThroughputMbPerSec( |
| Integer.parseInt( |
| properties.getProperty( |
| "merge_write_throughput_mb_per_sec", |
| Integer.toString(conf.getCompactionWriteThroughputMbPerSec())))); |
| |
| // update insert-tablet-plan's row limit for select-into |
| conf.setSelectIntoInsertTabletPlanRowLimit( |
| Integer.parseInt( |
| properties.getProperty( |
| "select_into_insert_tablet_plan_row_limit", |
| String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit())))); |
| |
| // update sync config |
| conf.setMaxNumberOfSyncFileRetry( |
| Integer.parseInt( |
| properties |
| .getProperty( |
| "max_number_of_sync_file_retry", |
| Integer.toString(conf.getMaxNumberOfSyncFileRetry())) |
| .trim())); |
| conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList())); |
| |
| // update enable query memory estimation for memory control |
| conf.setEnableQueryMemoryEstimation( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_query_memory_estimation", |
| Boolean.toString(conf.isEnableQueryMemoryEstimation())))); |
| |
| // update wal config |
| long prevDeleteWalFilesPeriodInMs = conf.getDeleteWalFilesPeriodInMs(); |
| loadWALHotModifiedProps(properties); |
| if (prevDeleteWalFilesPeriodInMs != conf.getDeleteWalFilesPeriodInMs()) { |
| WALManager.getInstance().rebootWALDeleteThread(); |
| } |
| } catch (Exception e) { |
| throw new QueryProcessException(String.format("Fail to reload configuration because %s", e)); |
| } |
| } |
| |
| public void loadHotModifiedProps() throws QueryProcessException { |
| URL url = getPropsUrl(); |
| if (url == null) { |
| logger.warn("Couldn't load the configuration from any of the known sources."); |
| return; |
| } |
| |
| try (InputStream inputStream = url.openStream()) { |
| logger.info("Start to reload config file {}", url); |
| Properties properties = new Properties(); |
| properties.load(inputStream); |
| loadHotModifiedProps(properties); |
| } catch (Exception e) { |
| logger.warn("Fail to reload config file {}", url, e); |
| throw new QueryProcessException( |
| String.format("Fail to reload config file %s because %s", url, e.getMessage())); |
| } |
| ReloadLevel reloadLevel = MetricConfigDescriptor.getInstance().loadHotProps(); |
| MetricService.getInstance().reloadProperties(reloadLevel); |
| } |
| |
| private void initMemoryAllocate(Properties properties) { |
| String memoryAllocateProportion = |
| properties.getProperty("write_read_schema_free_memory_proportion"); |
| if (memoryAllocateProportion != null) { |
| String[] proportions = memoryAllocateProportion.split(":"); |
| int proportionSum = 0; |
| for (String proportion : proportions) { |
| proportionSum += Integer.parseInt(proportion.trim()); |
| } |
| long maxMemoryAvailable = Runtime.getRuntime().maxMemory(); |
| if (proportionSum != 0) { |
| conf.setAllocateMemoryForStorageEngine( |
| maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum); |
| conf.setAllocateMemoryForRead( |
| maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum); |
| conf.setAllocateMemoryForSchema( |
| maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum); |
| } |
| } |
| |
| logger.info("allocateMemoryForRead = {}", conf.getAllocateMemoryForRead()); |
| logger.info("allocateMemoryForWrite = {}", conf.getAllocateMemoryForStorageEngine()); |
| logger.info("allocateMemoryForSchema = {}", conf.getAllocateMemoryForSchema()); |
| |
| initSchemaMemoryAllocate(properties); |
| initStorageEngineAllocate(properties); |
| |
| conf.setMaxQueryDeduplicatedPathNum( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_deduplicated_path_num", |
| Integer.toString(conf.getMaxQueryDeduplicatedPathNum())))); |
| |
| conf.setEnableQueryMemoryEstimation( |
| Boolean.parseBoolean( |
| properties.getProperty( |
| "enable_query_memory_estimation", |
| Boolean.toString(conf.isEnableQueryMemoryEstimation())))); |
| |
| String queryMemoryAllocateProportion = |
| properties.getProperty("chunk_timeseriesmeta_free_memory_proportion"); |
| if (queryMemoryAllocateProportion != null) { |
| String[] proportions = queryMemoryAllocateProportion.split(":"); |
| int proportionSum = 0; |
| for (String proportion : proportions) { |
| proportionSum += Integer.parseInt(proportion.trim()); |
| } |
| long maxMemoryAvailable = conf.getAllocateMemoryForRead(); |
| if (proportionSum != 0) { |
| try { |
| conf.setAllocateMemoryForBloomFilterCache( |
| maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum); |
| conf.setAllocateMemoryForChunkCache( |
| maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum); |
| conf.setAllocateMemoryForTimeSeriesMetaDataCache( |
| maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum); |
| conf.setAllocateMemoryForCoordinator( |
| maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) / proportionSum); |
| conf.setAllocateMemoryForOperators( |
| maxMemoryAvailable * Integer.parseInt(proportions[4].trim()) / proportionSum); |
| conf.setAllocateMemoryForDataExchange( |
| maxMemoryAvailable * Integer.parseInt(proportions[5].trim()) / proportionSum); |
| conf.setAllocateMemoryForTimeIndex( |
| maxMemoryAvailable * Integer.parseInt(proportions[6].trim()) / proportionSum); |
| } catch (Exception e) { |
| throw new RuntimeException( |
| "Each subsection of configuration item chunkmeta_chunk_timeseriesmeta_free_memory_proportion" |
| + " should be an integer, which is " |
| + queryMemoryAllocateProportion); |
| } |
| } |
| } |
| |
| // metadata cache is disabled, we need to move all their allocated memory to other parts |
| if (!conf.isMetaDataCacheEnable()) { |
| long sum = |
| conf.getAllocateMemoryForBloomFilterCache() |
| + conf.getAllocateMemoryForChunkCache() |
| + conf.getAllocateMemoryForTimeSeriesMetaDataCache(); |
| conf.setAllocateMemoryForBloomFilterCache(0); |
| conf.setAllocateMemoryForChunkCache(0); |
| conf.setAllocateMemoryForTimeSeriesMetaDataCache(0); |
| long partForDataExchange = sum / 2; |
| long partForOperators = sum - partForDataExchange; |
| conf.setAllocateMemoryForDataExchange( |
| conf.getAllocateMemoryForDataExchange() + partForDataExchange); |
| conf.setAllocateMemoryForOperators(conf.getAllocateMemoryForOperators() + partForOperators); |
| } |
| } |
| |
| private void initStorageEngineAllocate(Properties properties) { |
| String allocationRatio = properties.getProperty("storage_engine_memory_proportion", "8:2"); |
| String[] proportions = allocationRatio.split(":"); |
| int proportionForMemTable = Integer.parseInt(proportions[0].trim()); |
| int proportionForCompaction = Integer.parseInt(proportions[1].trim()); |
| conf.setWriteProportion( |
| ((double) (proportionForMemTable) |
| / (double) (proportionForCompaction + proportionForMemTable))); |
| conf.setCompactionProportion( |
| ((double) (proportionForCompaction) |
| / (double) (proportionForCompaction + proportionForMemTable))); |
| } |
| |
| private void initSchemaMemoryAllocate(Properties properties) { |
| long schemaMemoryTotal = conf.getAllocateMemoryForSchema(); |
| |
| int proportionSum = 10; |
| int schemaRegionProportion = 8; |
| int schemaCacheProportion = 1; |
| int partitionCacheProportion = 0; |
| int lastCacheProportion = 1; |
| |
| String schemaMemoryAllocatePortion = |
| properties.getProperty("schema_memory_allocate_proportion"); |
| if (schemaMemoryAllocatePortion != null) { |
| conf.setDefaultSchemaMemoryConfig(false); |
| String[] proportions = schemaMemoryAllocatePortion.split(":"); |
| int loadedProportionSum = 0; |
| for (String proportion : proportions) { |
| loadedProportionSum += Integer.parseInt(proportion.trim()); |
| } |
| |
| if (loadedProportionSum != 0) { |
| proportionSum = loadedProportionSum; |
| schemaRegionProportion = Integer.parseInt(proportions[0].trim()); |
| schemaCacheProportion = Integer.parseInt(proportions[1].trim()); |
| partitionCacheProportion = Integer.parseInt(proportions[2].trim()); |
| lastCacheProportion = Integer.parseInt(proportions[3].trim()); |
| } |
| } else { |
| conf.setDefaultSchemaMemoryConfig(true); |
| } |
| |
| conf.setAllocateMemoryForSchemaRegion( |
| schemaMemoryTotal * schemaRegionProportion / proportionSum); |
| logger.info("allocateMemoryForSchemaRegion = {}", conf.getAllocateMemoryForSchemaRegion()); |
| |
| conf.setAllocateMemoryForSchemaCache(schemaMemoryTotal * schemaCacheProportion / proportionSum); |
| logger.info("allocateMemoryForSchemaCache = {}", conf.getAllocateMemoryForSchemaCache()); |
| |
| conf.setAllocateMemoryForPartitionCache( |
| schemaMemoryTotal * partitionCacheProportion / proportionSum); |
| logger.info("allocateMemoryForPartitionCache = {}", conf.getAllocateMemoryForPartitionCache()); |
| |
| conf.setAllocateMemoryForLastCache(schemaMemoryTotal * lastCacheProportion / proportionSum); |
| logger.info("allocateMemoryForLastCache = {}", conf.getAllocateMemoryForLastCache()); |
| } |
| |
| @SuppressWarnings("squid:S3518") // "proportionSum" can't be zero |
| private void loadUDFProps(Properties properties) { |
| String initialByteArrayLengthForMemoryControl = |
| properties.getProperty("udf_initial_byte_array_length_for_memory_control"); |
| if (initialByteArrayLengthForMemoryControl != null) { |
| conf.setUdfInitialByteArrayLengthForMemoryControl( |
| Integer.parseInt(initialByteArrayLengthForMemoryControl)); |
| } |
| |
| conf.setUdfDir(properties.getProperty("udf_root_dir", conf.getUdfDir())); |
| |
| String memoryBudgetInMb = properties.getProperty("udf_memory_budget_in_mb"); |
| if (memoryBudgetInMb != null) { |
| conf.setUdfMemoryBudgetInMB( |
| (float) |
| Math.min(Float.parseFloat(memoryBudgetInMb), 0.2 * conf.getAllocateMemoryForRead())); |
| } |
| |
| String groupByFillCacheSizeInMB = properties.getProperty("group_by_fill_cache_size_in_mb"); |
| if (groupByFillCacheSizeInMB != null) { |
| conf.setGroupByFillCacheSizeInMB(Float.parseFloat(groupByFillCacheSizeInMB)); |
| } |
| |
| String readerTransformerCollectorMemoryProportion = |
| properties.getProperty("udf_reader_transformer_collector_memory_proportion"); |
| if (readerTransformerCollectorMemoryProportion != null) { |
| String[] proportions = readerTransformerCollectorMemoryProportion.split(":"); |
| int proportionSum = 0; |
| for (String proportion : proportions) { |
| proportionSum += Integer.parseInt(proportion.trim()); |
| } |
| float maxMemoryAvailable = conf.getUdfMemoryBudgetInMB(); |
| try { |
| conf.setUdfReaderMemoryBudgetInMB( |
| maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum); |
| conf.setUdfTransformerMemoryBudgetInMB( |
| maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum); |
| conf.setUdfCollectorMemoryBudgetInMB( |
| maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum); |
| } catch (Exception e) { |
| throw new RuntimeException( |
| "Each subsection of configuration item udf_reader_transformer_collector_memory_proportion" |
| + " should be an integer, which is " |
| + readerTransformerCollectorMemoryProportion); |
| } |
| } |
| } |
| |
| private void loadTriggerProps(Properties properties) { |
| conf.setTriggerDir(properties.getProperty("trigger_root_dir", conf.getTriggerDir())); |
| |
| int tlogBufferSize = |
| Integer.parseInt( |
| properties.getProperty("tlog_buffer_size", Integer.toString(conf.getTlogBufferSize()))); |
| if (tlogBufferSize > 0) { |
| conf.setTlogBufferSize(tlogBufferSize); |
| } |
| |
| conf.setTriggerForwardMaxQueueNumber( |
| Integer.parseInt( |
| properties.getProperty( |
| "trigger_forward_max_queue_number", |
| Integer.toString(conf.getTriggerForwardMaxQueueNumber())))); |
| conf.setTriggerForwardMaxSizePerQueue( |
| Integer.parseInt( |
| properties.getProperty( |
| "trigger_forward_max_size_per_queue", |
| Integer.toString(conf.getTriggerForwardMaxSizePerQueue())))); |
| conf.setTriggerForwardBatchSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "trigger_forward_batch_size", |
| Integer.toString(conf.getTriggerForwardBatchSize())))); |
| conf.setTriggerForwardHTTPPoolSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "trigger_forward_http_pool_size", |
| Integer.toString(conf.getTriggerForwardHTTPPoolSize())))); |
| conf.setTriggerForwardHTTPPOOLMaxPerRoute( |
| Integer.parseInt( |
| properties.getProperty( |
| "trigger_forward_http_pool_max_per_route", |
| Integer.toString(conf.getTriggerForwardHTTPPOOLMaxPerRoute())))); |
| conf.setTriggerForwardMQTTPoolSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "trigger_forward_mqtt_pool_size", |
| Integer.toString(conf.getTriggerForwardMQTTPoolSize())))); |
| } |
| |
| private void loadCQProps(Properties properties) { |
| conf.setContinuousQueryThreadNum( |
| Integer.parseInt( |
| properties.getProperty( |
| "continuous_query_thread_num", |
| Integer.toString(conf.getContinuousQueryThreadNum())))); |
| if (conf.getContinuousQueryThreadNum() <= 0) { |
| conf.setContinuousQueryThreadNum(Runtime.getRuntime().availableProcessors() / 2); |
| } |
| |
| conf.setMaxPendingContinuousQueryTasks( |
| Integer.parseInt( |
| properties.getProperty( |
| "max_pending_continuous_query_tasks", |
| Integer.toString(conf.getMaxPendingContinuousQueryTasks())))); |
| if (conf.getMaxPendingContinuousQueryTasks() <= 0) { |
| conf.setMaxPendingContinuousQueryTasks(64); |
| } |
| |
| conf.setContinuousQueryMinimumEveryInterval( |
| DatetimeUtils.convertDurationStrToLong( |
| properties.getProperty("continuous_query_minimum_every_interval", "1s"), |
| conf.getTimestampPrecision())); |
| |
| conf.setCqlogBufferSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "cqlog_buffer_size", Integer.toString(conf.getCqlogBufferSize())))); |
| } |
| |
| public void loadClusterProps(Properties properties) { |
| String configNodeUrls = properties.getProperty(IoTDBConstant.TARGET_CONFIG_NODES); |
| if (configNodeUrls != null) { |
| try { |
| conf.setTargetConfigNodeList(NodeUrlUtils.parseTEndPointUrls(configNodeUrls)); |
| } catch (BadNodeUrlException e) { |
| logger.error( |
| "Config nodes are set in wrong format, please set them like 127.0.0.1:22277,127.0.0.1:22281"); |
| } |
| } |
| |
| conf.setInternalAddress( |
| properties.getProperty(IoTDBConstant.INTERNAL_ADDRESS, conf.getInternalAddress())); |
| |
| conf.setInternalPort( |
| Integer.parseInt( |
| properties.getProperty( |
| IoTDBConstant.INTERNAL_PORT, Integer.toString(conf.getInternalPort())))); |
| |
| conf.setDataRegionConsensusPort( |
| Integer.parseInt( |
| properties.getProperty( |
| "data_region_consensus_port", |
| Integer.toString(conf.getDataRegionConsensusPort())))); |
| |
| conf.setSchemaRegionConsensusPort( |
| Integer.parseInt( |
| properties.getProperty( |
| "schema_region_consensus_port", |
| Integer.toString(conf.getSchemaRegionConsensusPort())))); |
| } |
| |
| public void loadShuffleProps(Properties properties) { |
| conf.setMppDataExchangePort( |
| Integer.parseInt( |
| properties.getProperty( |
| "mpp_data_exchange_port", Integer.toString(conf.getMppDataExchangePort())))); |
| conf.setMppDataExchangeCorePoolSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "mpp_data_exchange_core_pool_size", |
| Integer.toString(conf.getMppDataExchangeCorePoolSize())))); |
| conf.setMppDataExchangeMaxPoolSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "mpp_data_exchange_max_pool_size", |
| Integer.toString(conf.getMppDataExchangeMaxPoolSize())))); |
| conf.setMppDataExchangeKeepAliveTimeInMs( |
| Integer.parseInt( |
| properties.getProperty( |
| "mpp_data_exchange_keep_alive_time_in_ms", |
| Integer.toString(conf.getMppDataExchangeKeepAliveTimeInMs())))); |
| |
| conf.setPartitionCacheSize( |
| Integer.parseInt( |
| properties.getProperty( |
| "partition_cache_size", Integer.toString(conf.getPartitionCacheSize())))); |
| |
| conf.setDriverTaskExecutionTimeSliceInMs( |
| Integer.parseInt( |
| properties.getProperty( |
| "driver_task_execution_time_slice_in_ms", |
| Integer.toString(conf.getDriverTaskExecutionTimeSliceInMs())))); |
| } |
| |
| /** Get default encode algorithm by data type */ |
| public TSEncoding getDefaultEncodingByType(TSDataType dataType) { |
| switch (dataType) { |
| case BOOLEAN: |
| return conf.getDefaultBooleanEncoding(); |
| case INT32: |
| return conf.getDefaultInt32Encoding(); |
| case INT64: |
| return conf.getDefaultInt64Encoding(); |
| case FLOAT: |
| return conf.getDefaultFloatEncoding(); |
| case DOUBLE: |
| return conf.getDefaultDoubleEncoding(); |
| default: |
| return conf.getDefaultTextEncoding(); |
| } |
| } |
| |
| // These configurations are received from config node when registering |
| public void loadGlobalConfig(TGlobalConfig globalConfig) { |
| conf.setSeriesPartitionExecutorClass(globalConfig.getSeriesPartitionExecutorClass()); |
| conf.setSeriesPartitionSlotNum(globalConfig.getSeriesPartitionSlotNum()); |
| conf.setPartitionInterval(globalConfig.timePartitionInterval); |
| conf.setReadConsistencyLevel(globalConfig.getReadConsistencyLevel()); |
| } |
| |
| public void loadRatisConfig(TRatisConfig ratisConfig) { |
| conf.setRatisConsensusLogAppenderBufferSizeMax(ratisConfig.getAppenderBufferSize()); |
| conf.setRatisConsensusSnapshotTriggerThreshold(ratisConfig.getSnapshotTriggerThreshold()); |
| conf.setRatisConsensusLogUnsafeFlushEnable(ratisConfig.isLogUnsafeFlushEnable()); |
| conf.setRatisConsensusLogSegmentSizeMax(ratisConfig.getLogSegmentSizeMax()); |
| conf.setRatisConsensusGrpcFlowControlWindow(ratisConfig.getGrpcFlowControlWindow()); |
| conf.setRatisConsensusLeaderElectionTimeoutMinMs(ratisConfig.getLeaderElectionTimeoutMin()); |
| conf.setRatisConsensusLeaderElectionTimeoutMaxMs(ratisConfig.getLeaderElectionTimeoutMax()); |
| } |
| |
| public void initClusterSchemaMemoryAllocate() { |
| if (!conf.isDefaultSchemaMemoryConfig()) { |
| // the config has already been updated as user config in properties file |
| return; |
| } |
| |
| // process the default schema memory allocate |
| |
| long schemaMemoryTotal = conf.getAllocateMemoryForSchema(); |
| |
| int proportionSum = 10; |
| int schemaRegionProportion = 5; |
| int schemaCacheProportion = 3; |
| int partitionCacheProportion = 1; |
| int lastCacheProportion = 1; |
| |
| conf.setAllocateMemoryForSchemaRegion( |
| schemaMemoryTotal * schemaRegionProportion / proportionSum); |
| logger.info( |
| "Cluster allocateMemoryForSchemaRegion = {}", conf.getAllocateMemoryForSchemaRegion()); |
| |
| conf.setAllocateMemoryForSchemaCache(schemaMemoryTotal * schemaCacheProportion / proportionSum); |
| logger.info( |
| "Cluster allocateMemoryForSchemaCache = {}", conf.getAllocateMemoryForSchemaCache()); |
| |
| conf.setAllocateMemoryForPartitionCache( |
| schemaMemoryTotal * partitionCacheProportion / proportionSum); |
| logger.info( |
| "Cluster allocateMemoryForPartitionCache = {}", conf.getAllocateMemoryForPartitionCache()); |
| |
| conf.setAllocateMemoryForLastCache(schemaMemoryTotal * lastCacheProportion / proportionSum); |
| logger.info("Cluster allocateMemoryForLastCache = {}", conf.getAllocateMemoryForLastCache()); |
| } |
| |
| private static class IoTDBDescriptorHolder { |
| |
| private static final IoTDBDescriptor INSTANCE = new IoTDBDescriptor(); |
| |
| private IoTDBDescriptorHolder() {} |
| } |
| } |