blob: e1fadcf5df95066226081c72d51d7d373cb352cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.joda.time.DateTime;
@Slf4j
public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
private final Map<String, Long> indexLatestSuccess;
public HistoryDeleteEsDAO(ElasticSearchClient client) {
super(client);
this.indexLatestSuccess = new HashMap<>();
}
@Override
public void deleteHistory(Model model, String timeBucketColumnName, int ttl) {
ElasticSearchClient client = getClient();
if (!model.isRecord()) {
if (!DownSampling.Minute.equals(model.getDownsampling())) {
/*
* In ElasticSearch storage, the TTL triggers the index deletion directly.
* As all metrics data in different down sampling rule of one day are in the same index, the deletion operation
* is only required to run once.
*/
return;
}
}
long deadline = Long.parseLong(new DateTime().plusDays(-ttl).toString("yyyyMMdd"));
String tableName = IndexController.INSTANCE.getTableName(model);
Long latestSuccessDeadline = this.indexLatestSuccess.get(model.getName());
if (latestSuccessDeadline != null && deadline <= latestSuccessDeadline) {
if (log.isDebugEnabled()) {
log.debug("Index = {} already deleted, skip, deadline = {}, ttl = {}", tableName, deadline, ttl);
}
return;
}
Collection<String> indices = client.retrievalIndexByAliases(tableName);
if (log.isDebugEnabled()) {
log.debug("Deadline = {}, indices = {}, ttl = {}", deadline, indices, ttl);
}
List<String> prepareDeleteIndexes = new ArrayList<>();
List<String> leftIndices = new ArrayList<>();
for (String index : indices) {
long timeSeries = TimeSeriesUtils.isolateTimeFromIndexName(index);
if (deadline >= timeSeries) {
prepareDeleteIndexes.add(index);
} else {
leftIndices.add(index);
}
}
if (log.isDebugEnabled()) {
log.debug("Indices to be deleted: {}", prepareDeleteIndexes);
}
for (String prepareDeleteIndex : prepareDeleteIndexes) {
client.deleteByIndexName(prepareDeleteIndex);
}
String latestIndex = TimeSeriesUtils.latestWriteIndexName(model);
String formattedLatestIndex = client.formatIndexName(latestIndex);
if (!leftIndices.contains(formattedLatestIndex)) {
client.createIndex(latestIndex);
}
this.indexLatestSuccess.put(tableName, deadline);
}
}