[Critical] Fix a low performance issue of metrics persistent in ES (#9437)

[Critical] Fix a low-performance issue of metrics persistent in the ElasticSearch storage implementation. One single metric could have to wait for an unnecessary 7~10s(System Env Variable SW_STORAGE_ES_FLUSH_INTERVAL).

This is a solid and critical performance issue, from the self-observability metric, we could find one metric that would need 7-10s to execute the data flush, which is not expected.
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 2e3e549..6d39be5 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -2,6 +2,9 @@
 
 #### Project
 
+* [Critical] Fix a low performance issue of metrics persistent in the ElasticSearch storage implementation. One single
+  metric could have to wait for an unnecessary 7~10s(System Env Variable `SW_STORAGE_ES_FLUSH_INTERVAL`) since 8.8.0 -
+  9.1.0 releases.
 * Upgrade Armeria to 1.16.0, Kubernetes Java client to 15.0.1.
 
 #### OAP Server
@@ -21,18 +24,25 @@
 * Remove legacy OAL `percentile` functions, `p99`, `p95`, `p90`, `p75`, `p50` func(s).
 * Revert [#8066](https://github.com/apache/skywalking/pull/8066). Keep all metrics persistent even it is default value.
 * Skip loading UI templates if folder is empty or doesn't exist.
-* Optimize ElasticSearch query performance by using `_mGet` and physical index name rather than alias in these scenarios,  (a) Metrics aggregation (b) Zipkin query (c) Metrics query (d) Log query
+* Optimize ElasticSearch query performance by using `_mGet` and physical index name rather than alias in these
+  scenarios,  (a) Metrics aggregation (b) Zipkin query (c) Metrics query (d) Log query
 * Support the `NETWORK` type of eBPF Profiling task.
 * Support `sumHistogram` in `MAL`.
-* [Breaking Change] Make the eBPF Profiling task support to the service instance level, index/table `ebpf_profiling_task` is required to be re-created when bump up from previous releases.
+* [Breaking Change] Make the eBPF Profiling task support to the service instance level,
+  index/table `ebpf_profiling_task` is required to be re-created when bump up from previous releases.
 * Fix race condition in Banyandb storage
 * Support `SUM_PER_MIN` downsampling in `MAL`.
 * Support `sumHistogramPercentile` in `MAL`.
 * Add `VIRTUAL_CACHE` to Layer, to fix conjectured Redis server, which icon can't show on the topology.
-* [Breaking Change] Elasticsearch storage merge all metrics/meter and records(without super datasets) indices into one physical index template `metrics-all` and `records-all` on the default setting.
-  Provide system environment variable(`SW_STORAGE_ES_LOGIC_SHARDING`) to shard metrics/records indices into multi-physical indices as the previous versions(one index template per metric/meter aggregation function).
-  In the current one index mode, users still could choose to adjust ElasticSearch's shard number(`SW_STORAGE_ES_INDEX_SHARDS_NUMBER`) to scale out.
-* [Breaking Change] Many columns of metrics and records model names are changed, The H2/Mysql/Tidb/Postgres storage users are required to remove all metrics-related and records-related tables for OAP to re-create or use a new database instance.
+* [Breaking Change] Elasticsearch storage merge all metrics/meter and records(without super datasets) indices into one
+  physical index template `metrics-all` and `records-all` on the default setting.
+  Provide system environment variable(`SW_STORAGE_ES_LOGIC_SHARDING`) to shard metrics/records indices into
+  multi-physical indices as the previous versions(one index template per metric/meter aggregation function).
+  In the current one index mode, users still could choose to adjust ElasticSearch's shard
+  number(`SW_STORAGE_ES_INDEX_SHARDS_NUMBER`) to scale out.
+* [Breaking Change] Many columns of metrics and records model names are changed, The H2/Mysql/Tidb/Postgres storage
+  users are required to remove all metrics-related and records-related tables for OAP to re-create or use a new database
+  instance.
 * Fix Zipkin trace query the max size of spans.
 * Add `tls` and `https` component IDs for Network Profiling.
 
@@ -54,7 +64,8 @@
 * Dashboard: Add metrics association.
 * Dashboard: Fix `FaaS-Root` document link and topology service relation dashboard link.
 * Dashboard: Fix `Mesh-Instance` metric `Throughput`.
-* Dashboard: Fix `Mesh-Service-Relation` metric `Throughput` and `Proxy Sidecar Internal Latency in Nanoseconds (Client Response)`.
+* Dashboard: Fix `Mesh-Service-Relation` metric `Throughput`
+  and `Proxy Sidecar Internal Latency in Nanoseconds (Client Response)`.
 * Dashboard: Fix `Mesh-Instance-Relation` metric `Throughput`.
 * Enhance associations for the Event widget.
 * Add event widgets in dashboard where applicable.
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
index a5d23e0..6841cb5 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
@@ -101,7 +101,7 @@
         }
     }
 
-    void flush() {
+    public void flush() {
         if (requests.isEmpty()) {
             return;
         }
diff --git a/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json b/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
index 685b46b..5465063 100644
--- a/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
+++ b/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
@@ -241,7 +241,7 @@
           "i": "8",
           "type": "Widget",
           "widget": {
-            "title": "Persistence Execution Latency (ms)"
+            "title": "Persistence Execution Latency Per Metric Type (ms)"
           },
           "graph": {
             "type": "Line",
@@ -267,7 +267,7 @@
           "i": "9",
           "type": "Widget",
           "widget": {
-            "title": "Persistence Preparing Latency (ms)"
+            "title": "Persistence Preparing Latency Per Metric Type (ms)"
           },
           "graph": {
             "type": "Line",
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index 39b6a81..1a4aaa5 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -52,7 +52,8 @@
         if (bulkProcessor == null) {
             synchronized (this) {
                 if (bulkProcessor == null) {
-                    this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
+                    this.bulkProcessor = getClient().createBulkProcessor(
+                        bulkActions, flushInterval, concurrentRequests);
                 }
             }
         }
@@ -65,19 +66,25 @@
         if (bulkProcessor == null) {
             synchronized (this) {
                 if (bulkProcessor == null) {
-                    this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
+                    this.bulkProcessor = getClient().createBulkProcessor(
+                        bulkActions, flushInterval, concurrentRequests);
                 }
             }
         }
 
         if (CollectionUtils.isNotEmpty(prepareRequests)) {
-            return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
-                if (prepareRequest instanceof InsertRequest) {
-                    return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
-                } else {
-                    return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
-                }
-            }).toArray(CompletableFuture[]::new));
+            try {
+                return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
+                    if (prepareRequest instanceof InsertRequest) {
+                        return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
+                    } else {
+                        return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
+                    }
+                }).toArray(CompletableFuture[]::new));
+            } finally {
+                // Flush forcedly due to this kind of metrics has been pushed into the bulk processor.
+                bulkProcessor.flush();
+            }
         }
         return CompletableFuture.completedFuture(null);
     }