Improve the speed of writing TiDB by batching the SQL execution (#7691)

diff --git a/CHANGES.md b/CHANGES.md
index 15e11e7..6d1e8b7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -66,6 +66,7 @@
 * Support gRPC sync grouped dynamic configurations.
 * Fix `H2EventQueryDAO` doesn't sort data by Event.START_TIME and uses a wrong pagination query.
 * Fix `LogHandler` of `kafka-fetcher-plugin` cannot recognize namespace.
+* Improve the speed of writing TiDB by batching the SQL execution.
 
 #### UI
 
diff --git a/docs/en/setup/backend/backend-storage.md b/docs/en/setup/backend/backend-storage.md
index 45468f5..5151546 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -30,6 +30,8 @@
     driver: org.h2.jdbcx.JdbcDataSource
     url: jdbc:h2:mem:skywalking-oap-db
     user: sa
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:100}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:1}
 ```
 
 ## OpenSearch
@@ -194,7 +196,7 @@
   selector: ${SW_STORAGE:mysql}
   mysql:
     properties:
-      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest"}
+      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest?rewriteBatchedStatements=true"}
       dataSource.user: ${SW_DATA_SOURCE_USER:root}
       dataSource.password: ${SW_DATA_SOURCE_PASSWORD:root@1234}
       dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
@@ -202,9 +204,12 @@
       dataSource.prepStmtCacheSqlLimit: ${SW_DATA_SOURCE_PREP_STMT_CACHE_SQL_LIMIT:2048}
       dataSource.useServerPrepStmts: ${SW_DATA_SOURCE_USE_SERVER_PREP_STMTS:true}
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
 ```
 All connection-related settings, including URL link, username, and password are found in `application.yml`. 
 Only part of the settings are listed here. See the [HikariCP](https://github.com/brettwooldridge/HikariCP) connection pool document for full settings.
+To understand the function of the parameter `rewriteBatchedStatements=true` in MySQL, see the [MySQL official document](https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-performance-extensions.html#cj-conn-prop_rewriteBatchedStatements).
 
 ## TiDB
 Tested TiDB Server 4.0.8 version and MySQL Client driver 8.0.13 version are currently available.
@@ -215,7 +220,7 @@
   selector: ${SW_STORAGE:tidb}
   tidb:
     properties:
-      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/swtest"}
+      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/swtest?rewriteBatchedStatements=true"}
       dataSource.user: ${SW_DATA_SOURCE_USER:root}
       dataSource.password: ${SW_DATA_SOURCE_PASSWORD:""}
       dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
@@ -226,9 +231,12 @@
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
 ```
 All connection-related settings, including URL link, username, and password are found in `application.yml`. 
 For details on settings, refer to the configuration of *MySQL* above.
+To understand the function of the parameter `rewriteBatchedStatements=true` in TiDB, see the document of [TiDB best practices](https://docs.pingcap.com/tidb/stable/java-app-best-practices#use-batch-api).
 
 ## InfluxDB
 InfluxDB storage provides a time-series database as a new storage option.
@@ -266,6 +274,8 @@
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
 ```
 All connection-related settings, including URL link, username, and password are found in `application.yml`. 
 Only part of the settings are listed here. Please follow [HikariCP](https://github.com/brettwooldridge/HikariCP) connection pool document for full settings.
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index 5387a69..1e48ed0 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -114,16 +114,22 @@
 | - | - | metadataQueryMaxSize | The maximum size of metadata per query. | SW_STORAGE_H2_QUERY_MAX_SIZE | 5000 |
 | - | - | maxSizeOfArrayColumn | Some entities (e.g. trace segments) include the logic column with multiple values. In H2, we use multiple physical columns to host the values: e.g. change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5`. | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
 | - | - | numOfSearchableValuesPerTag | In a trace segment, this includes multiple spans with multiple tags. Different spans may have the same tag key, e.g. multiple HTTP exit spans all have their own `http.method` tags. This configuration sets the limit on the maximum number of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
+| - | - | maxSizeOfBatchSql | The maximum size of batch size of SQL execution | SW_STORAGE_MAX_SIZE_OF_BATCH_SQL | 100 |
+| - | - | asyncBatchPersistentPoolSize | async flush data into database thread size | SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE | 1 |
 | - |mysql| - | MySQL Storage. The MySQL JDBC Driver is not in the dist. Please copy it into the oap-lib folder manually. | - | - |
 | - | - | properties | Hikari connection pool configurations. | - | Listed in the `application.yaml`. |
 | - | - | metadataQueryMaxSize | The maximum size of metadata per query. | SW_STORAGE_MYSQL_QUERY_MAX_SIZE | 5000 |
 | - | - | maxSizeOfArrayColumn | Some entities (e.g. trace segments) include the logic column with multiple values. In MySQL, we use multiple physical columns to host the values, e.g. change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5`. | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
 | - | - | numOfSearchableValuesPerTag | In a trace segment, this includes multiple spans with multiple tags. Different spans may have same tag key, e.g. multiple HTTP exit spans all have their own `http.method` tags. This configuration sets the limit on the maximum number of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
+| - | - | maxSizeOfBatchSql | The maximum size of batch size of SQL execution | SW_STORAGE_MAX_SIZE_OF_BATCH_SQL | 2000 |
+| - | - | asyncBatchPersistentPoolSize | async flush data into database thread size | SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE | 4 |
 | - |postgresql| - | PostgreSQL storage. | - | - |
 | - | - | properties | Hikari connection pool configurations. | - | Listed in the `application.yaml`. |
 | - | - | metadataQueryMaxSize | The maximum size of metadata per query. | SW_STORAGE_MYSQL_QUERY_MAX_SIZE | 5000 |
 | - | - | maxSizeOfArrayColumn | Some entities (e.g. trace segments) include the logic column with multiple values. In PostgreSQL, we use multiple physical columns to host the values, e.g. change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5` | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
 | - | - | numOfSearchableValuesPerTag | In a trace segment, this includes multiple spans with multiple tags. Different spans may have same tag key, e.g. multiple HTTP exit spans all have their own `http.method` tags. This configuration sets the limit on the maximum number of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
+| - | - | maxSizeOfBatchSql | The maximum size of batch size of SQL execution | SW_STORAGE_MAX_SIZE_OF_BATCH_SQL | 2000 |
+| - | - | asyncBatchPersistentPoolSize | async flush data into database thread size | SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE | 4 |
 | - |influxdb| - | InfluxDB storage. |- | - |
 | - | - | url| InfluxDB connection URL. | SW_STORAGE_INFLUXDB_URL | http://localhost:8086|
 | - | - | user | User name of InfluxDB. | SW_STORAGE_INFLUXDB_USER | root|
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index dc13b21..979b174 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -158,9 +158,11 @@
     metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:100}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:1}
   mysql:
     properties:
-      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest"}
+      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest?rewriteBatchedStatements=true"}
       dataSource.user: ${SW_DATA_SOURCE_USER:root}
       dataSource.password: ${SW_DATA_SOURCE_PASSWORD:root@1234}
       dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
@@ -170,9 +172,11 @@
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
   tidb:
     properties:
-      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest"}
+      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest?rewriteBatchedStatements=true"}
       dataSource.user: ${SW_DATA_SOURCE_USER:root}
       dataSource.password: ${SW_DATA_SOURCE_PASSWORD:""}
       dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
@@ -183,6 +187,8 @@
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
   influxdb:
     # InfluxDB configuration
     url: ${SW_STORAGE_INFLUXDB_URL:http://localhost:8086}
@@ -206,6 +212,8 @@
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
   zipkin-elasticsearch:
     namespace: ${SW_NAMESPACE:""}
     clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
diff --git a/oap-server/server-starter/src/test/java/org/apache/skywalking/oap/server/starter/config/ApplicationConfigLoaderTestCase.java b/oap-server/server-starter/src/test/java/org/apache/skywalking/oap/server/starter/config/ApplicationConfigLoaderTestCase.java
index ccc7bf2..52b61c5 100644
--- a/oap-server/server-starter/src/test/java/org/apache/skywalking/oap/server/starter/config/ApplicationConfigLoaderTestCase.java
+++ b/oap-server/server-starter/src/test/java/org/apache/skywalking/oap/server/starter/config/ApplicationConfigLoaderTestCase.java
@@ -49,7 +49,7 @@
         assertThat(providerConfig.get("metadataQueryMaxSize"), is(5000));
         assertThat(providerConfig.get("properties"), instanceOf(Properties.class));
         Properties properties = (Properties) providerConfig.get("properties");
-        assertThat(properties.get("jdbcUrl"), is("jdbc:mysql://localhost:3306/swtest"));
+        assertThat(properties.get("jdbcUrl"), is("jdbc:mysql://localhost:3306/swtest?rewriteBatchedStatements=true"));
     }
 
     @Test
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
new file mode 100644
index 0000000..a3d4848
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private final List<PrepareRequest> prepareRequests;
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. sql by key size: {}", prepareRequests.size());
+        }
+        if (prepareRequests.size() == 0) {
+            return;
+        }
+        String sql = prepareRequests.get(0).toString();
+        try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+            int pendingCount = 0;
+            for (int k = 0; k < prepareRequests.size(); k++) {
+                SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+                sqlExecutor.setParameters(preparedStatement);
+                preparedStatement.addBatch();
+                if (k > 0 && k % maxBatchSqlSize == 0) {
+                    executeBatch(preparedStatement, maxBatchSqlSize, sql);
+                    pendingCount = 0;
+                } else {
+                    pendingCount++;
+                }
+            }
+            if (pendingCount > 0) {
+                executeBatch(preparedStatement, pendingCount, sql);
+            }
+        }
+    }
+
+    private void executeBatch(PreparedStatement preparedStatement, int pendingCount, String sql) throws SQLException {
+        long start = System.currentTimeMillis();
+        preparedStatement.executeBatch();
+        if (log.isDebugEnabled()) {
+            long end = System.currentTimeMillis();
+            long cost = end - start;
+            log.debug("execute batch sql, batch size: {}, cost:{}ms, sql: {}", pendingCount, cost, sql);
+        }
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
index 65c5d02..8891b95 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
@@ -22,6 +22,7 @@
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.List;
+import lombok.EqualsAndHashCode;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
 import org.slf4j.Logger;
@@ -30,6 +31,7 @@
 /**
  * A SQL executor.
  */
+@EqualsAndHashCode(of = "sql")
 public class SQLExecutor implements InsertRequest, UpdateRequest {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(SQLExecutor.class);
@@ -44,14 +46,21 @@
 
     public void invoke(Connection connection) throws SQLException {
         PreparedStatement preparedStatement = connection.prepareStatement(sql);
-
-        for (int i = 0; i < param.size(); i++) {
-            preparedStatement.setObject(i + 1, param.get(i));
-        }
-
+        setParameters(preparedStatement);
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param);
         }
         preparedStatement.execute();
     }
+
+    public void setParameters(PreparedStatement preparedStatement) throws SQLException {
+        for (int i = 0; i < param.size(); i++) {
+            preparedStatement.setObject(i + 1, param.get(i));
+        }
+    }
+
+    @Override
+    public String toString() {
+        return sql;
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
index 06b8b26..90f400e 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
@@ -59,4 +59,16 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * The maximum size of batch size of SQL execution
+     *
+     * @since 8.8.0
+     */
+    private int maxSizeOfBatchSql = 100;
+    /**
+     * async batch execute pool size
+     *
+     * @since 8.8.0
+     */
+    private int asyncBatchPersistentPoolSize  = 1;
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index b074566..1a93880 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -117,7 +117,7 @@
         settings.setProperty("dataSource.password", config.getPassword());
         h2Client = new JDBCHikariCPClient(settings);
 
-        this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client));
+        this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client, config.getMaxSizeOfBatchSql(), config.getAsyncBatchPersistentPoolSize()));
         this.registerServiceImplementation(
             StorageDAO.class,
             new H2StorageDAO(
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
index 2101e68..cd7d4fb 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
@@ -22,38 +22,35 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private final int maxBatchSqlSize;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int maxBatchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {}, maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, maxBatchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.maxBatchSqlSize = maxBatchSqlSize;
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
@@ -61,23 +58,27 @@
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
-
+        final Map<PrepareRequest, List<PrepareRequest>> batchRequestMap =
+                prepareRequests.stream().collect(Collectors.groupingBy(Function.identity()));
         try (Connection connection = h2Client.getConnection()) {
-            for (PrepareRequest prepareRequest : prepareRequests) {
+            batchRequestMap.forEach((key, requests) -> {
                 try {
-                    SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
-                    sqlExecutor.invoke(connection);
+                    BatchSQLExecutor batchSQLExecutor =
+                            new BatchSQLExecutor(requests);
+                    batchSQLExecutor.invoke(connection, maxBatchSqlSize);
                 } catch (SQLException e) {
                     // Just avoid one execution failure makes the rest of batch failure.
                     log.error(e.getMessage(), e);
                 }
-            }
+            });
         } catch (SQLException | JDBCClientException e) {
-            log.error(e.getMessage(), e);
+            log.warn("execute sql failed, discard data size: {}", prepareRequests.size(), e);
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
     }
 
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java
index bf0cccb..3e0ddfb 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java
@@ -39,5 +39,17 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * The maximum size of batch size of SQL execution
+     *
+     * @since 8.8.0
+     */
+    private int maxSizeOfBatchSql = 2000;
+    /**
+     * async batch execute pool size
+     *
+     * @since 8.8.0
+     */
+    private int asyncBatchPersistentPoolSize  = 4;
     private Properties properties;
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
index 233431a..c9745b8 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
@@ -103,7 +103,7 @@
 
         mysqlClient = new JDBCHikariCPClient(config.getProperties());
 
-        this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));
+        this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient, config.getMaxSizeOfBatchSql(), config.getAsyncBatchPersistentPoolSize()));
         this.registerServiceImplementation(
             StorageDAO.class,
             new H2StorageDAO(
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageConfig.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageConfig.java
index 67465bf..0715923 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageConfig.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageConfig.java
@@ -25,5 +25,4 @@
 @Setter
 @Getter
 public class PostgreSQLStorageConfig extends MySQLStorageConfig {
-
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageProvider.java
index 52800b3..031f510 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageProvider.java
@@ -103,7 +103,7 @@
 
         postgresqlClient = new JDBCHikariCPClient(config.getProperties());
 
-        this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(postgresqlClient));
+        this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(postgresqlClient, config.getMaxSizeOfBatchSql(), config.getAsyncBatchPersistentPoolSize()));
         this.registerServiceImplementation(
                 StorageDAO.class,
                 new H2StorageDAO(
diff --git a/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageProvider.java b/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageProvider.java
index 05d057c..935ec7a 100644
--- a/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageProvider.java
@@ -106,7 +106,7 @@
         
         mysqlClient = new JDBCHikariCPClient(config.getProperties());
 
-        this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));
+        this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient, config.getMaxSizeOfBatchSql(), config.getAsyncBatchPersistentPoolSize()));
         this.registerServiceImplementation(
             StorageDAO.class,
             new H2StorageDAO(