blob: 25733e725cec3362dc2dbcee0d7f5272644b55b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.index.elastic;
import org.apache.jackrabbit.JcrConstants;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import co.elastic.clients.elasticsearch._types.ExpandWildcard;
import co.elastic.clients.elasticsearch.cat.IndicesResponse;
import co.elastic.clients.elasticsearch.cat.indices.IndicesRecord;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
/**
* Deletes those remote elastic indexes for which no index definitions exist in repository. The remote indexes are not deleted
* the first time they are discovered. A dangling remote index is deleted in subsequent runs of this cleaner only after a
* given threshold of time has passed since the dangling index was first discovered.
*/
public class ElasticIndexCleaner implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ElasticIndexCleaner.class);
private final ElasticConnection elasticConnection;
private final NodeStore nodeStore;
private final String indexPrefix;
private final Map<String, Long> danglingRemoteIndicesMap;
private final int threshold;
/**
* Constructs a new instance of index cleaner with the given parameters.
* @param elasticConnection elastic connection to use
* @param nodeStore node store where index definitions exist
* @param thresholdInSeconds time in seconds before which a dangling remote index won't be deleted.
*/
public ElasticIndexCleaner(ElasticConnection elasticConnection, NodeStore nodeStore, int thresholdInSeconds) {
this.elasticConnection = elasticConnection;
this.nodeStore = nodeStore;
this.indexPrefix = elasticConnection.getIndexPrefix();
danglingRemoteIndicesMap = new HashMap<>();
this.threshold = thresholdInSeconds;
}
public void run() {
try {
NodeState root = nodeStore.getRoot();
IndicesResponse indicesRes = elasticConnection.getClient()
.cat().indices(r -> r
.index(elasticConnection.getIndexPrefix() + "*")
.expandWildcards(ExpandWildcard.Open));
String[] remoteIndices = indicesRes.valueBody()
.stream().map(IndicesRecord::index).toArray(String[]::new);
if (remoteIndices.length == 0) {
LOG.debug("No remote index found with prefix {}", indexPrefix);
return;
}
// remove entry of remote index names which don't exist now
List<String> externallyDeletedIndices = danglingRemoteIndicesMap.keySet().stream().
filter(index -> Arrays.stream(remoteIndices).noneMatch(remoteIndex -> remoteIndex.equals(index))).collect(Collectors.toList());
externallyDeletedIndices.forEach(danglingRemoteIndicesMap::remove);
Set<String> existingIndices = new HashSet<>();
AtomicBoolean shouldReturnSilently = new AtomicBoolean(false);
root.getChildNode(INDEX_DEFINITIONS_NAME).getChildNodeEntries().forEach(childNodeEntry -> {
PropertyState typeProperty = childNodeEntry.getNodeState().getProperty(IndexConstants.TYPE_PROPERTY_NAME);
String typeValue = typeProperty != null ? typeProperty.getValue(Type.STRING) : "";
/*
If index type is "elasticsearch" or "disabled", we try to find remote index name. In case of disabled lucene or
property indices, the remote index name would be null. So only elasticsearch indices are affected here.
*/
if (ElasticIndexDefinition.TYPE_ELASTICSEARCH.equals(typeValue) || "disabled".equals(typeValue)) {
String indexPath = "/" + INDEX_DEFINITIONS_NAME + "/" + childNodeEntry.getName();
String remoteIndexName = getRemoteIndexName(indexPrefix, childNodeEntry.getNodeState(), indexPath);
if (remoteIndexName != null) {
existingIndices.add(remoteIndexName);
} else if (ElasticIndexDefinition.TYPE_ELASTICSEARCH.equals(typeValue)){
/*
Didn't check for disabled indexes in this "else if" condition because an index could be disabled at three stages
and the following should hold -
Stage 1 - index definition was created with type="disabled". This means indexing has not been done for index and
hence remote index name and the remote index itself shouldn't exist
Stage 2 - index was disabled after indexing was complete - in this case we would find remote index name and won't
enter this condition
Stage 3 - index was disabled after indexing started but before it was complete. Ideally this should only be done in
case of some error in indexing and after interrupting and pausing indexing lane. So in this case it should be safe to
delete the partially created remote index as it should be recreated by reindexing.
*/
LOG.info("Could not obtain remote index name for {}. Won't delete any index.", childNodeEntry.getName());
shouldReturnSilently.set(true);
}
}
});
if (shouldReturnSilently.get()) {
return;
}
List<String> indicesToDelete = new ArrayList<>();
for (String remoteIndexName : remoteIndices) {
if (!existingIndices.contains(remoteIndexName)) {
Long curTime = System.currentTimeMillis();
Long oldTime = danglingRemoteIndicesMap.putIfAbsent(remoteIndexName, curTime);
if (threshold == 0 || (oldTime != null && curTime - oldTime >= TimeUnit.SECONDS.toMillis(threshold))) {
indicesToDelete.add(remoteIndexName);
danglingRemoteIndicesMap.remove(remoteIndexName);
}
} else {
danglingRemoteIndicesMap.remove(remoteIndexName);
}
}
if(!indicesToDelete.isEmpty()) {
DeleteIndexResponse response = elasticConnection.getClient().indices().delete(i -> i.index(indicesToDelete));
LOG.info("Deleting remote indices {}", indicesToDelete);
if (!response.acknowledged()) {
LOG.error("Could not delete remote indices " + indicesToDelete);
}
}
} catch (IOException e) {
LOG.error("Could not delete remote indices", e);
}
}
protected static @Nullable String getRemoteIndexName(String indexPrefix, NodeState indexNode, String indexPath) {
PropertyState nodeTypeProp = indexNode.getProperty(JcrConstants.JCR_PRIMARYTYPE);
if (nodeTypeProp == null || !IndexConstants.INDEX_DEFINITIONS_NODE_TYPE.equals(nodeTypeProp.getValue(Type.STRING))) {
throw new IllegalArgumentException("Not an index definition node state");
}
PropertyState type = indexNode.getProperty(IndexConstants.TYPE_PROPERTY_NAME);
String typeValue = type != null ? type.getValue(Type.STRING) : "";
if (!ElasticIndexDefinition.TYPE_ELASTICSEARCH.equals(typeValue) && !"disabled".equals(typeValue)) {
throw new IllegalArgumentException("Not an elastic index node");
}
try {
return ElasticIndexNameHelper.getRemoteIndexName(indexPrefix, indexPath, indexNode.builder());
} catch (IllegalStateException ise) { // this happens when there is no seed for the index
return null;
}
}
}