Optimize TTL mechanism for Elasticsearch storage, skip executed indices in one TTL rotation. (#9473)

* Optimize TTL mechanism for Elasticsearch storage, skip executed indices in one TTL rotation

* remove inspect
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 8603cdd..06d953b 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -51,6 +51,7 @@
 * Fix query services by serviceId error when Elasticsearch storage `SW_STORAGE_ES_QUERY_MAX_SIZE` > 10000.
 * Support sending alarm messages to Discord.
 * Fix query history process data failure.
+* Optimize TTL mechanism for Elasticsearch storage, skip executed indices in one TTL rotation.
 
 #### UI
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
index 2cc34e0..b5fd659 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
@@ -19,7 +19,6 @@
 package org.apache.skywalking.oap.server.core.storage;
 
 import java.io.IOException;
-import java.util.List;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 
@@ -36,17 +35,4 @@
      * @throws IOException when error happens in the deletion process.
      */
     void deleteHistory(Model model, String timeBucketColumnName, int ttl) throws IOException;
-
-    /**
-     * Inspection is also driven by the TTL timer. This method is optional to implement, typically, this could be used
-     * to do routing inspection for timer series data, and get the latest status of existing data boundaries(oldest and
-     * latest).
-     *
-     * @param models               model list
-     * @param timeBucketColumnName column name represents the time. Right now, always {@link Metrics#TIME_BUCKET}
-     * @throws IOException when error happens in the deletion process.
-     */
-    default void inspect(List<Model> models, String timeBucketColumnName) throws IOException {
-
-    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index 996d08d..aacb226 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -75,22 +75,17 @@
         IModelManager modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelManager.class);
         List<Model> models = modelGetter.allModels();
 
-        try {
-            List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
-            if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
-                log.info(
-                    "The selected first getAddress is {}. The remove stage is skipped.",
-                    remoteInstances.get(0).toString()
-                );
-                return;
-            }
-
-            log.info("Beginning to remove expired metrics from the storage.");
-            models.forEach(this::execute);
-        } finally {
-            log.info("Beginning to inspect data boundaries.");
-            this.inspect(models);
+        List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
+        if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
+            log.info(
+                "The selected first getAddress is {}. The remove stage is skipped.",
+                remoteInstances.get(0).toString()
+            );
+            return;
         }
+
+        log.info("Beginning to remove expired metrics from the storage.");
+        models.forEach(this::execute);
     }
 
     private void execute(Model model) {
@@ -116,15 +111,4 @@
             log.error(e.getMessage(), e);
         }
     }
-
-    private void inspect(List<Model> models) {
-        try {
-            moduleManager.find(StorageModule.NAME)
-                         .provider()
-                         .getService(IHistoryDeleteDAO.class)
-                         .inspect(models, Metrics.TIME_BUCKET);
-        } catch (IOException e) {
-            log.error(e.getMessage(), e);
-        }
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/IndicesMetadataCache.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/IndicesMetadataCache.java
deleted file mode 100644
index e1fb00c..0000000
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/IndicesMetadataCache.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
-
-import java.util.HashSet;
-import java.util.List;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * IndicesMetadataCache hosts all pseudo real time metadata of indices.
- */
-@Slf4j
-public class IndicesMetadataCache {
-    public static IndicesMetadataCache INSTANCE = new IndicesMetadataCache();
-
-    private volatile HashSet<String> existingIndices;
-
-    private IndicesMetadataCache() {
-        existingIndices = new HashSet<>();
-    }
-
-    public void update(List<String> indices) {
-        existingIndices = new HashSet<>(indices);
-    }
-
-    /**
-     * @return true if given index name exists currently.
-     */
-    public boolean isExisting(String index) {
-        return existingIndices.contains(index);
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
index 2070aaa..e1fadcf 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
@@ -20,20 +20,23 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.IndicesMetadataCache;
 import org.joda.time.DateTime;
 
 @Slf4j
 public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
+    private final Map<String, Long> indexLatestSuccess;
 
     public HistoryDeleteEsDAO(ElasticSearchClient client) {
         super(client);
+        this.indexLatestSuccess = new HashMap<>();
     }
 
     @Override
@@ -52,6 +55,13 @@
         }
         long deadline = Long.parseLong(new DateTime().plusDays(-ttl).toString("yyyyMMdd"));
         String tableName = IndexController.INSTANCE.getTableName(model);
+        Long latestSuccessDeadline = this.indexLatestSuccess.get(model.getName());
+        if (latestSuccessDeadline != null && deadline <= latestSuccessDeadline) {
+            if (log.isDebugEnabled()) {
+                log.debug("Index = {} already deleted, skip, deadline = {}, ttl = {}", tableName, deadline, ttl);
+            }
+            return;
+        }
         Collection<String> indices = client.retrievalIndexByAliases(tableName);
 
         if (log.isDebugEnabled()) {
@@ -79,32 +89,6 @@
         if (!leftIndices.contains(formattedLatestIndex)) {
             client.createIndex(latestIndex);
         }
-    }
-
-    @Override
-    public void inspect(List<Model> models, String timeBucketColumnName) {
-        List<String> indices = new ArrayList<>();
-        models.forEach(model -> {
-            if (!model.isTimeSeries()) {
-                return;
-            }
-
-            ElasticSearchClient client = getClient();
-
-            if (!model.isRecord()) {
-                if (!DownSampling.Minute.equals(model.getDownsampling())) {
-                    /*
-                     * As all metrics data in different down sampling rule of one day are in the same index, the inspection
-                     * operation is only required to run once.
-                     */
-                    return;
-                }
-            }
-            String tableName = IndexController.INSTANCE.getTableName(model);
-            Collection<String> indexes = client.retrievalIndexByAliases(tableName);
-
-            indices.addAll(indexes);
-        });
-        IndicesMetadataCache.INSTANCE.update(indices);
+        this.indexLatestSuccess.put(tableName, deadline);
     }
 }