use global write thread pool.
diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc index 859bb25..ea4bf12 100644 --- a/cpp/src/common/global.cc +++ b/cpp/src/common/global.cc
@@ -26,11 +26,17 @@ #include <thread> +#ifdef ENABLE_THREADS +#include "common/thread_pool.h" +#endif #include "utils/injection.h" namespace common { ColumnSchema g_time_column_schema; +#ifdef ENABLE_THREADS +ThreadPool* g_write_thread_pool_ = nullptr; +#endif ConfigValue g_config_value_; void init_config_value() { @@ -137,6 +143,15 @@ g_time_column_schema.encoding_ = PLAIN; g_time_column_schema.compression_ = UNCOMPRESSED; g_time_column_schema.column_name_ = storage::TIME_COLUMN_NAME; +#ifdef ENABLE_THREADS + // (Re)create the global write thread pool with the configured size. + delete g_write_thread_pool_; + size_t pool_size = + g_config_value_.write_thread_count_ > 0 + ? static_cast<size_t>(g_config_value_.write_thread_count_) + : size_t{1}; + g_write_thread_pool_ = new ThreadPool(pool_size); +#endif return ret; }
diff --git a/cpp/src/common/global.h b/cpp/src/common/global.h index 3a4525b..ba5f4bd 100644 --- a/cpp/src/common/global.h +++ b/cpp/src/common/global.h
@@ -173,14 +173,20 @@ } // Set the number of threads for parallel writes. Must be called before -// constructing TsFileWriter instances — existing writers' thread pools -// are not resized at runtime. +// init_common() / libtsfile_init() — the global thread pool is created +// during initialization and is not resized at runtime. FORCE_INLINE int set_write_thread_count(int32_t count) { if (count < 1 || count > 64) return E_INVALID_ARG; g_config_value_.write_thread_count_ = count; return E_OK; } +#ifdef ENABLE_THREADS +class ThreadPool; +// Global write thread pool, created by init_common(). +extern ThreadPool* g_write_thread_pool_; +#endif + extern int init_common(); extern bool is_timestamp_column_name(const char* time_col_name); extern void cols_to_json(ByteStream* byte_stream,
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index cee742f..35520c8 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc
@@ -23,6 +23,9 @@ #include "chunk_writer.h" #include "common/config/config.h" +#ifdef ENABLE_THREADS +#include "common/thread_pool.h" +#endif #include "file/restorable_tsfile_io_writer.h" #include "file/tsfile_io_writer.h" #include "file/write_file.h" @@ -50,6 +53,10 @@ } void libtsfile_destroy() { +#ifdef ENABLE_THREADS + delete common::g_write_thread_pool_; + common::g_write_thread_pool_ = nullptr; +#endif ModStat::get_instance().destroy(); libtsfile::g_s_is_inited = false; } @@ -1262,7 +1269,7 @@ #ifdef ENABLE_THREADS if (g_config_value_.parallel_write_enabled_) { std::vector<std::future<int>> futures; - futures.push_back(thread_pool_.submit( + futures.push_back(g_write_thread_pool_->submit( [&write_time_in_segments, time_chunk_writer]() { return write_time_in_segments(time_chunk_writer); })); @@ -1270,7 +1277,7 @@ ValueChunkWriter* vcw = value_chunk_writers[k]; if (IS_NULL(vcw)) continue; uint32_t col_idx = field_columns[k]; - futures.push_back(thread_pool_.submit( + futures.push_back(g_write_thread_pool_->submit( [&write_value_in_segments, vcw, col_idx]() { return write_value_in_segments(vcw, col_idx); }));
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index bf40987..cb1b952 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h
@@ -33,9 +33,6 @@ #include "common/record.h" #include "common/schema.h" #include "common/tablet.h" -#ifdef ENABLE_THREADS -#include "common/thread_pool.h" -#endif namespace storage { class WriteFile; @@ -198,13 +195,6 @@ int64_t record_count_for_next_mem_check_; bool write_file_created_; bool io_writer_owned_; // false when init(RestorableTsFileIOWriter*) -#ifdef ENABLE_THREADS - static size_t get_thread_pool_size() { - int32_t count = common::g_config_value_.write_thread_count_; - return count > 0 ? static_cast<size_t>(count) : size_t{1}; - } - common::ThreadPool thread_pool_{get_thread_pool_size()}; -#endif int write_typed_column(ValueChunkWriter* value_chunk_writer, int64_t* timestamps, bool* col_values,