| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.jackrabbit.oak.plugins.index.elastic; |
| |
| import org.apache.jackrabbit.oak.api.PropertyState; |
| import org.apache.jackrabbit.oak.api.Type; |
| import org.apache.jackrabbit.oak.plugins.index.IndexConstants; |
| import org.apache.jackrabbit.oak.spi.state.NodeState; |
| import org.apache.jackrabbit.oak.spi.state.NodeStore; |
| import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; |
| import org.elasticsearch.action.support.IndicesOptions; |
| import org.elasticsearch.action.support.master.AcknowledgedResponse; |
| import org.elasticsearch.client.RequestOptions; |
| import org.elasticsearch.client.indices.GetIndexRequest; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME; |
| |
| /** |
| * Deletes those remote elastic indexes for which no index definitions exist in repository. The remote indexes are not deleted |
| * the first time they are discovered. A dangling remote index is deleted in subsequent runs of this cleaner only after a |
| * given threshold of time has passed since the dangling index was first discovered. |
| */ |
| public class ElasticIndexCleaner implements Runnable { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ElasticIndexCleaner.class); |
| |
| private final ElasticConnection elasticConnection; |
| private final NodeStore nodeStore; |
| private final String indexPrefix; |
| private final Map<String, Long> danglingRemoteIndicesMap; |
| private final int threshold; |
| |
| /** |
| * Constructs a new instance of index cleaner with the given parameters. |
| * @param elasticConnection elastic connection to use |
| * @param nodeStore node store where index definitions exist |
| * @param thresholdInSeconds time in seconds before which a dangling remote index won't be deleted. |
| */ |
| public ElasticIndexCleaner(ElasticConnection elasticConnection, NodeStore nodeStore, int thresholdInSeconds) { |
| this.elasticConnection = elasticConnection; |
| this.nodeStore = nodeStore; |
| this.indexPrefix = elasticConnection.getIndexPrefix(); |
| danglingRemoteIndicesMap = new HashMap<>(); |
| this.threshold = thresholdInSeconds; |
| } |
| |
| public void run() { |
| try { |
| NodeState root = nodeStore.getRoot(); |
| GetIndexRequest getIndexRequest = new GetIndexRequest(elasticConnection.getIndexPrefix() + "*") |
| .indicesOptions(IndicesOptions.lenientExpandOpen()); |
| String[] remoteIndices = elasticConnection.getClient().indices() |
| .get(getIndexRequest, RequestOptions.DEFAULT).getIndices(); |
| if (remoteIndices == null || remoteIndices.length == 0) { |
| LOG.debug("No remote index found with prefix {}", indexPrefix); |
| return; |
| } |
| // remove entry of remote index names which don't exist now |
| List<String> externallyDeletedIndices = danglingRemoteIndicesMap.keySet().stream(). |
| filter(index -> Arrays.stream(remoteIndices).noneMatch(remoteIndex -> remoteIndex.equals(index))).collect(Collectors.toList()); |
| externallyDeletedIndices.forEach(danglingRemoteIndicesMap::remove); |
| Set<String> existingIndices = new HashSet<>(); |
| root.getChildNode(INDEX_DEFINITIONS_NAME).getChildNodeEntries().forEach(childNodeEntry -> { |
| PropertyState typeProperty = childNodeEntry.getNodeState().getProperty(IndexConstants.TYPE_PROPERTY_NAME); |
| String typeValue = typeProperty != null ? typeProperty.getValue(Type.STRING) : ""; |
| /* |
| If index type is "elasticsearch" or "disabled", we try to find remote index name. In case of disabled lucene or |
| property indices, the remote index name would be null. So only elasticsearch indices are affected here. |
| */ |
| if (typeValue.equals(ElasticIndexDefinition.TYPE_ELASTICSEARCH) || typeValue.equals("disabled")) { |
| String indexPath = "/" + INDEX_DEFINITIONS_NAME + "/" + childNodeEntry.getName(); |
| String remoteIndexName = ElasticIndexNameHelper.getRemoteIndexName(indexPrefix, childNodeEntry.getNodeState(), indexPath); |
| if (remoteIndexName != null) { |
| existingIndices.add(remoteIndexName); |
| } |
| } |
| }); |
| |
| List<String> indicesToDelete = new ArrayList<>(); |
| for (String remoteIndexName : remoteIndices) { |
| if (!existingIndices.contains(remoteIndexName)) { |
| Long curTime = System.currentTimeMillis(); |
| Long oldTime = danglingRemoteIndicesMap.putIfAbsent(remoteIndexName, curTime); |
| if (threshold == 0 || (oldTime != null && curTime - oldTime >= TimeUnit.SECONDS.toMillis(threshold))) { |
| indicesToDelete.add(remoteIndexName); |
| danglingRemoteIndicesMap.remove(remoteIndexName); |
| } |
| } else { |
| danglingRemoteIndicesMap.remove(remoteIndexName); |
| } |
| } |
| DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indicesToDelete.toArray(new String[]{})); |
| if (deleteIndexRequest.indices() != null && deleteIndexRequest.indices().length > 0) { |
| String indexString = Arrays.toString(deleteIndexRequest.indices()); |
| AcknowledgedResponse acknowledgedResponse = elasticConnection.getClient().indices().delete(deleteIndexRequest, RequestOptions.DEFAULT); |
| LOG.info("Deleting remote indices {}", indexString); |
| if (!acknowledgedResponse.isAcknowledged()) { |
| LOG.error("Could not delete remote indices " + indexString); |
| } |
| } |
| } catch (IOException e) { |
| LOG.error("Could not delete remote indices", e); |
| } |
| } |
| } |