/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.jackrabbit.oak.plugins.index.elastic;

import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;

/**
 * Deletes those remote elastic indexes for which no index definitions exist in repository. The remote indexes are not deleted
 * the first time they are discovered. A dangling remote index is deleted in subsequent runs of this cleaner only after a
 * given threshold of time has passed since the dangling index was first discovered.
 */
public class ElasticIndexCleaner implements Runnable {

    private static final Logger LOG = LoggerFactory.getLogger(ElasticIndexCleaner.class);

    private final ElasticConnection elasticConnection;
    private final NodeStore nodeStore;
    private final String indexPrefix;
    private final Map<String, Long> danglingRemoteIndicesMap;
    private final int threshold;

    /**
     * Constructs a new instance of index cleaner with the given parameters.
     * @param elasticConnection elastic connection to use
     * @param nodeStore node store where index definitions exist
     * @param thresholdInSeconds time in seconds before which a dangling remote index won't be deleted.
     */
    public ElasticIndexCleaner(ElasticConnection elasticConnection, NodeStore nodeStore, int thresholdInSeconds) {
        this.elasticConnection = elasticConnection;
        this.nodeStore = nodeStore;
        this.indexPrefix = elasticConnection.getIndexPrefix();
        danglingRemoteIndicesMap = new HashMap<>();
        this.threshold = thresholdInSeconds;
    }

    public void run() {
        try {
            NodeState root = nodeStore.getRoot();
            GetIndexRequest getIndexRequest = new GetIndexRequest(elasticConnection.getIndexPrefix() + "*")
                    .indicesOptions(IndicesOptions.lenientExpandOpen());
            String[] remoteIndices = elasticConnection.getClient().indices()
                    .get(getIndexRequest, RequestOptions.DEFAULT).getIndices();
            if (remoteIndices == null || remoteIndices.length == 0) {
                LOG.debug("No remote index found with prefix {}", indexPrefix);
                return;
            }
            // remove entry of remote index names which don't exist now
            List<String> externallyDeletedIndices = danglingRemoteIndicesMap.keySet().stream().
                    filter(index -> Arrays.stream(remoteIndices).noneMatch(remoteIndex -> remoteIndex.equals(index))).collect(Collectors.toList());
            externallyDeletedIndices.forEach(danglingRemoteIndicesMap::remove);
            Set<String> existingIndices = new HashSet<>();
            root.getChildNode(INDEX_DEFINITIONS_NAME).getChildNodeEntries().forEach(childNodeEntry -> {
                PropertyState typeProperty = childNodeEntry.getNodeState().getProperty(IndexConstants.TYPE_PROPERTY_NAME);
                String typeValue = typeProperty != null ? typeProperty.getValue(Type.STRING) : "";
                /*
                If index type is "elasticsearch" or "disabled", we try to find remote index name. In case of disabled lucene or
                property indices, the remote index name would be null. So only elasticsearch indices are affected here.
                 */
                if (typeValue.equals(ElasticIndexDefinition.TYPE_ELASTICSEARCH) || typeValue.equals("disabled")) {
                    String indexPath = "/" + INDEX_DEFINITIONS_NAME + "/" + childNodeEntry.getName();
                    String remoteIndexName = ElasticIndexNameHelper.getRemoteIndexName(indexPrefix, childNodeEntry.getNodeState(), indexPath);
                    if (remoteIndexName != null) {
                        existingIndices.add(remoteIndexName);
                    }
                }
            });

            List<String> indicesToDelete = new ArrayList<>();
            for (String remoteIndexName : remoteIndices) {
                if (!existingIndices.contains(remoteIndexName)) {
                    Long curTime = System.currentTimeMillis();
                    Long oldTime = danglingRemoteIndicesMap.putIfAbsent(remoteIndexName, curTime);
                    if (threshold == 0 || (oldTime != null && curTime - oldTime >= TimeUnit.SECONDS.toMillis(threshold))) {
                        indicesToDelete.add(remoteIndexName);
                        danglingRemoteIndicesMap.remove(remoteIndexName);
                    }
                } else {
                    danglingRemoteIndicesMap.remove(remoteIndexName);
                }
            }
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indicesToDelete.toArray(new String[]{}));
            if (deleteIndexRequest.indices() != null && deleteIndexRequest.indices().length > 0) {
                String indexString = Arrays.toString(deleteIndexRequest.indices());
                AcknowledgedResponse acknowledgedResponse = elasticConnection.getClient().indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
                LOG.info("Deleting remote indices {}", indexString);
                if (!acknowledgedResponse.isAcknowledged()) {
                    LOG.error("Could not delete remote indices " + indexString);
                }
            }
        } catch (IOException e) {
            LOG.error("Could not delete remote indices", e);
        }
    }
}
