[Critical] Fix a low performance issue of metrics persistent in ES (#9437)
[Critical] Fix a low-performance issue of metrics persistent in the ElasticSearch storage implementation. One single metric could have to wait for an unnecessary 7~10s(System Env Variable SW_STORAGE_ES_FLUSH_INTERVAL).
This is a solid and critical performance issue, from the self-observability metric, we could find one metric that would need 7-10s to execute the data flush, which is not expected.
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 2e3e549..6d39be5 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -2,6 +2,9 @@
#### Project
+* [Critical] Fix a low performance issue of metrics persistent in the ElasticSearch storage implementation. One single
+ metric could have to wait for an unnecessary 7~10s(System Env Variable `SW_STORAGE_ES_FLUSH_INTERVAL`) since 8.8.0 -
+ 9.1.0 releases.
* Upgrade Armeria to 1.16.0, Kubernetes Java client to 15.0.1.
#### OAP Server
@@ -21,18 +24,25 @@
* Remove legacy OAL `percentile` functions, `p99`, `p95`, `p90`, `p75`, `p50` func(s).
* Revert [#8066](https://github.com/apache/skywalking/pull/8066). Keep all metrics persistent even it is default value.
* Skip loading UI templates if folder is empty or doesn't exist.
-* Optimize ElasticSearch query performance by using `_mGet` and physical index name rather than alias in these scenarios, (a) Metrics aggregation (b) Zipkin query (c) Metrics query (d) Log query
+* Optimize ElasticSearch query performance by using `_mGet` and physical index name rather than alias in these
+ scenarios, (a) Metrics aggregation (b) Zipkin query (c) Metrics query (d) Log query
* Support the `NETWORK` type of eBPF Profiling task.
* Support `sumHistogram` in `MAL`.
-* [Breaking Change] Make the eBPF Profiling task support to the service instance level, index/table `ebpf_profiling_task` is required to be re-created when bump up from previous releases.
+* [Breaking Change] Make the eBPF Profiling task support to the service instance level,
+ index/table `ebpf_profiling_task` is required to be re-created when bump up from previous releases.
* Fix race condition in Banyandb storage
* Support `SUM_PER_MIN` downsampling in `MAL`.
* Support `sumHistogramPercentile` in `MAL`.
* Add `VIRTUAL_CACHE` to Layer, to fix conjectured Redis server, which icon can't show on the topology.
-* [Breaking Change] Elasticsearch storage merge all metrics/meter and records(without super datasets) indices into one physical index template `metrics-all` and `records-all` on the default setting.
- Provide system environment variable(`SW_STORAGE_ES_LOGIC_SHARDING`) to shard metrics/records indices into multi-physical indices as the previous versions(one index template per metric/meter aggregation function).
- In the current one index mode, users still could choose to adjust ElasticSearch's shard number(`SW_STORAGE_ES_INDEX_SHARDS_NUMBER`) to scale out.
-* [Breaking Change] Many columns of metrics and records model names are changed, The H2/Mysql/Tidb/Postgres storage users are required to remove all metrics-related and records-related tables for OAP to re-create or use a new database instance.
+* [Breaking Change] Elasticsearch storage merge all metrics/meter and records(without super datasets) indices into one
+ physical index template `metrics-all` and `records-all` on the default setting.
+ Provide system environment variable(`SW_STORAGE_ES_LOGIC_SHARDING`) to shard metrics/records indices into
+ multi-physical indices as the previous versions(one index template per metric/meter aggregation function).
+ In the current one index mode, users still could choose to adjust ElasticSearch's shard
+ number(`SW_STORAGE_ES_INDEX_SHARDS_NUMBER`) to scale out.
+* [Breaking Change] Many columns of metrics and records model names are changed, The H2/Mysql/Tidb/Postgres storage
+ users are required to remove all metrics-related and records-related tables for OAP to re-create or use a new database
+ instance.
* Fix Zipkin trace query the max size of spans.
* Add `tls` and `https` component IDs for Network Profiling.
@@ -54,7 +64,8 @@
* Dashboard: Add metrics association.
* Dashboard: Fix `FaaS-Root` document link and topology service relation dashboard link.
* Dashboard: Fix `Mesh-Instance` metric `Throughput`.
-* Dashboard: Fix `Mesh-Service-Relation` metric `Throughput` and `Proxy Sidecar Internal Latency in Nanoseconds (Client Response)`.
+* Dashboard: Fix `Mesh-Service-Relation` metric `Throughput`
+ and `Proxy Sidecar Internal Latency in Nanoseconds (Client Response)`.
* Dashboard: Fix `Mesh-Instance-Relation` metric `Throughput`.
* Enhance associations for the Event widget.
* Add event widgets in dashboard where applicable.
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
index a5d23e0..6841cb5 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
@@ -101,7 +101,7 @@
}
}
- void flush() {
+ public void flush() {
if (requests.isEmpty()) {
return;
}
diff --git a/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json b/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
index 685b46b..5465063 100644
--- a/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
+++ b/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
@@ -241,7 +241,7 @@
"i": "8",
"type": "Widget",
"widget": {
- "title": "Persistence Execution Latency (ms)"
+ "title": "Persistence Execution Latency Per Metric Type (ms)"
},
"graph": {
"type": "Line",
@@ -267,7 +267,7 @@
"i": "9",
"type": "Widget",
"widget": {
- "title": "Persistence Preparing Latency (ms)"
+ "title": "Persistence Preparing Latency Per Metric Type (ms)"
},
"graph": {
"type": "Line",
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index 39b6a81..1a4aaa5 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -52,7 +52,8 @@
if (bulkProcessor == null) {
synchronized (this) {
if (bulkProcessor == null) {
- this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
+ this.bulkProcessor = getClient().createBulkProcessor(
+ bulkActions, flushInterval, concurrentRequests);
}
}
}
@@ -65,19 +66,25 @@
if (bulkProcessor == null) {
synchronized (this) {
if (bulkProcessor == null) {
- this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
+ this.bulkProcessor = getClient().createBulkProcessor(
+ bulkActions, flushInterval, concurrentRequests);
}
}
}
if (CollectionUtils.isNotEmpty(prepareRequests)) {
- return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
- if (prepareRequest instanceof InsertRequest) {
- return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
- } else {
- return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
- }
- }).toArray(CompletableFuture[]::new));
+ try {
+ return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
+ if (prepareRequest instanceof InsertRequest) {
+ return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
+ } else {
+ return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
+ }
+ }).toArray(CompletableFuture[]::new));
+ } finally {
+ // Flush forcedly due to this kind of metrics has been pushed into the bulk processor.
+ bulkProcessor.flush();
+ }
}
return CompletableFuture.completedFuture(null);
}