Optimize TTL mechanism for Elasticsearch storage, skip executed indices in one TTL rotation. (#9473)
* Optimize TTL mechanism for Elasticsearch storage, skip executed indices in one TTL rotation
* remove inspect
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 8603cdd..06d953b 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -51,6 +51,7 @@
* Fix query services by serviceId error when Elasticsearch storage `SW_STORAGE_ES_QUERY_MAX_SIZE` > 10000.
* Support sending alarm messages to Discord.
* Fix query history process data failure.
+* Optimize TTL mechanism for Elasticsearch storage, skip executed indices in one TTL rotation.
#### UI
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
index 2cc34e0..b5fd659 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
@@ -19,7 +19,6 @@
package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
-import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -36,17 +35,4 @@
* @throws IOException when error happens in the deletion process.
*/
void deleteHistory(Model model, String timeBucketColumnName, int ttl) throws IOException;
-
- /**
- * Inspection is also driven by the TTL timer. This method is optional to implement, typically, this could be used
- * to do routing inspection for timer series data, and get the latest status of existing data boundaries(oldest and
- * latest).
- *
- * @param models model list
- * @param timeBucketColumnName column name represents the time. Right now, always {@link Metrics#TIME_BUCKET}
- * @throws IOException when error happens in the deletion process.
- */
- default void inspect(List<Model> models, String timeBucketColumnName) throws IOException {
-
- }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index 996d08d..aacb226 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -75,22 +75,17 @@
IModelManager modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelManager.class);
List<Model> models = modelGetter.allModels();
- try {
- List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
- if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
- log.info(
- "The selected first getAddress is {}. The remove stage is skipped.",
- remoteInstances.get(0).toString()
- );
- return;
- }
-
- log.info("Beginning to remove expired metrics from the storage.");
- models.forEach(this::execute);
- } finally {
- log.info("Beginning to inspect data boundaries.");
- this.inspect(models);
+ List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
+ if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
+ log.info(
+ "The selected first getAddress is {}. The remove stage is skipped.",
+ remoteInstances.get(0).toString()
+ );
+ return;
}
+
+ log.info("Beginning to remove expired metrics from the storage.");
+ models.forEach(this::execute);
}
private void execute(Model model) {
@@ -116,15 +111,4 @@
log.error(e.getMessage(), e);
}
}
-
- private void inspect(List<Model> models) {
- try {
- moduleManager.find(StorageModule.NAME)
- .provider()
- .getService(IHistoryDeleteDAO.class)
- .inspect(models, Metrics.TIME_BUCKET);
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- }
- }
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/IndicesMetadataCache.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/IndicesMetadataCache.java
deleted file mode 100644
index e1fb00c..0000000
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/IndicesMetadataCache.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
-
-import java.util.HashSet;
-import java.util.List;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * IndicesMetadataCache hosts all pseudo real time metadata of indices.
- */
-@Slf4j
-public class IndicesMetadataCache {
- public static IndicesMetadataCache INSTANCE = new IndicesMetadataCache();
-
- private volatile HashSet<String> existingIndices;
-
- private IndicesMetadataCache() {
- existingIndices = new HashSet<>();
- }
-
- public void update(List<String> indices) {
- existingIndices = new HashSet<>(indices);
- }
-
- /**
- * @return true if given index name exists currently.
- */
- public boolean isExisting(String index) {
- return existingIndices.contains(index);
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
index 2070aaa..e1fadcf 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
@@ -20,20 +20,23 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.IndicesMetadataCache;
import org.joda.time.DateTime;
@Slf4j
public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
+ private final Map<String, Long> indexLatestSuccess;
public HistoryDeleteEsDAO(ElasticSearchClient client) {
super(client);
+ this.indexLatestSuccess = new HashMap<>();
}
@Override
@@ -52,6 +55,13 @@
}
long deadline = Long.parseLong(new DateTime().plusDays(-ttl).toString("yyyyMMdd"));
String tableName = IndexController.INSTANCE.getTableName(model);
+ Long latestSuccessDeadline = this.indexLatestSuccess.get(model.getName());
+ if (latestSuccessDeadline != null && deadline <= latestSuccessDeadline) {
+ if (log.isDebugEnabled()) {
+ log.debug("Index = {} already deleted, skip, deadline = {}, ttl = {}", tableName, deadline, ttl);
+ }
+ return;
+ }
Collection<String> indices = client.retrievalIndexByAliases(tableName);
if (log.isDebugEnabled()) {
@@ -79,32 +89,6 @@
if (!leftIndices.contains(formattedLatestIndex)) {
client.createIndex(latestIndex);
}
- }
-
- @Override
- public void inspect(List<Model> models, String timeBucketColumnName) {
- List<String> indices = new ArrayList<>();
- models.forEach(model -> {
- if (!model.isTimeSeries()) {
- return;
- }
-
- ElasticSearchClient client = getClient();
-
- if (!model.isRecord()) {
- if (!DownSampling.Minute.equals(model.getDownsampling())) {
- /*
- * As all metrics data in different down sampling rule of one day are in the same index, the inspection
- * operation is only required to run once.
- */
- return;
- }
- }
- String tableName = IndexController.INSTANCE.getTableName(model);
- Collection<String> indexes = client.retrievalIndexByAliases(tableName);
-
- indices.addAll(indexes);
- });
- IndicesMetadataCache.INSTANCE.update(indices);
+ this.indexLatestSuccess.put(tableName, deadline);
}
}