/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;

import com.google.gson.Gson;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.response.Index;
import org.apache.skywalking.library.elasticsearch.response.IndexTemplate;
import org.apache.skywalking.library.elasticsearch.response.Mappings;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;

@Slf4j
public class StorageEsInstaller extends ModelInstaller {
    private final Gson gson = new Gson();
    private final StorageModuleElasticsearchConfig config;
    protected final ColumnTypeEsMapping columnTypeEsMapping;


    /**
     * The mappings of the template .
     */
    private final IndexStructures structures;

    public StorageEsInstaller(Client client,
                              ModuleManager moduleManager,
                              StorageModuleElasticsearchConfig config) {
        super(client, moduleManager);
        this.columnTypeEsMapping = new ColumnTypeEsMapping();
        this.config = config;
        this.structures = getStructures();
    }

    protected IndexStructures getStructures() {
        return new IndexStructures();
    }

    @Override
    protected boolean isExists(Model model) {
        ElasticSearchClient esClient = (ElasticSearchClient) client;
        String tableName = IndexController.INSTANCE.getTableName(model);
        IndexController.LogicIndicesRegister.registerRelation(model, tableName);
        if (!model.isTimeSeries()) {
            boolean exist = esClient.isExistsIndex(tableName);
            if (exist) {
                Mappings historyMapping = esClient.getIndex(tableName)
                                                  .map(Index::getMappings)
                                                  .orElseGet(Mappings::new);
                structures.putStructure(tableName, historyMapping);
                exist = structures.containsStructure(tableName, createMapping(model));
            }
            return exist;
        }
        boolean templateExists = esClient.isExistsTemplate(tableName);
        final Optional<IndexTemplate> template = esClient.getTemplate(tableName);
        boolean lastIndexExists = esClient.isExistsIndex(TimeSeriesUtils.latestWriteIndexName(model));

        if ((templateExists && !template.isPresent()) || (!templateExists && template.isPresent())) {
            throw new Error("[Bug warning] ElasticSearch client query template result is not consistent. " +
                                "Please file an issue to Apache SkyWalking.(https://github.com/apache/skywalking/issues)");
        }

        boolean exist = templateExists && lastIndexExists;

        if (exist) {
            structures.putStructure(
                tableName, template.get().getMappings()
            );
            exist = structures.containsStructure(tableName, createMapping(model));
        }
        return exist;
    }

    @Override
    protected void createTable(Model model) throws StorageException {
        if (model.isTimeSeries()) {
            createTimeSeriesTable(model);
        } else {
            createNormalTable(model);
        }
    }

    private void createNormalTable(Model model) throws StorageException {
        ElasticSearchClient esClient = (ElasticSearchClient) client;
        String tableName = IndexController.INSTANCE.getTableName(model);
        Mappings mapping = createMapping(model);
        if (!esClient.isExistsIndex(tableName)) {
            Map<String, Object> settings = createSetting(model);
            boolean isAcknowledged = esClient.createIndex(tableName, mapping, settings);
            log.info("create {} index finished, isAcknowledged: {}", tableName, isAcknowledged);
            if (!isAcknowledged) {
                throw new StorageException("create " + tableName + " index failure, ");
            }
        } else {
            Mappings historyMapping = esClient.getIndex(tableName)
                                              .map(Index::getMappings)
                                              .orElseGet(Mappings::new);
            structures.putStructure(tableName, mapping);
            Mappings appendMapping = structures.diffStructure(tableName, historyMapping);
            if (appendMapping.getProperties() != null && !appendMapping.getProperties().isEmpty()) {
                boolean isAcknowledged = esClient.updateIndexMapping(tableName, appendMapping);
                log.info("update {} index finished, isAcknowledged: {}, append mappings: {}", tableName,
                         isAcknowledged, appendMapping
                );
                if (!isAcknowledged) {
                    throw new StorageException("update " + tableName + " index failure");
                }
            }
        }
    }

    private void createTimeSeriesTable(Model model) throws StorageException {
        ElasticSearchClient esClient = (ElasticSearchClient) client;
        String tableName = IndexController.INSTANCE.getTableName(model);
        Map<String, Object> settings = createSetting(model);
        Mappings mapping = createMapping(model);
        String indexName = TimeSeriesUtils.latestWriteIndexName(model);
        try {
            boolean shouldUpdateTemplate = !esClient.isExistsTemplate(tableName);
            shouldUpdateTemplate = shouldUpdateTemplate || !structures.containsStructure(tableName, mapping);
            if (shouldUpdateTemplate) {
                structures.putStructure(tableName, mapping);
                boolean isAcknowledged = esClient.createOrUpdateTemplate(
                    tableName, settings, structures.getMapping(tableName), config.getIndexTemplateOrder());
                log.info("create {} index template finished, isAcknowledged: {}", tableName, isAcknowledged);
                if (!isAcknowledged) {
                    throw new IOException("create " + tableName + " index template failure");
                }
            }

            if (esClient.isExistsIndex(indexName)) {
                Mappings historyMapping = esClient.getIndex(indexName)
                        .map(Index::getMappings)
                        .orElseGet(Mappings::new);
                Mappings appendMapping = structures.diffStructure(tableName, historyMapping);
                if (appendMapping.getProperties() != null && !appendMapping.getProperties().isEmpty()) {
                    boolean isAcknowledged = esClient.updateIndexMapping(indexName, appendMapping);
                    log.info("update {} index finished, isAcknowledged: {}, append mappings: {}", indexName,
                            isAcknowledged, appendMapping
                    );
                    if (!isAcknowledged) {
                        throw new StorageException("update " + indexName + " time series index failure");
                    }
                }
            } else {
                boolean isAcknowledged = esClient.createIndex(indexName);
                log.info("create {} index finished, isAcknowledged: {}", indexName, isAcknowledged);
                if (!isAcknowledged) {
                    throw new StorageException("create " + indexName + " time series index failure");
                }
            }
        } catch (IOException e) {
            throw new StorageException("cannot create " + tableName + " index template", e);
        }
    }

    protected Map<String, Object> createSetting(Model model) throws StorageException {
        Map<String, Object> setting = new HashMap<>();

        setting.put("index.number_of_replicas", model.isSuperDataset()
            ? config.getSuperDatasetIndexReplicasNumber()
            : config.getIndexReplicasNumber());
        setting.put("index.number_of_shards", model.isSuperDataset()
            ? config.getIndexShardsNumber() * config.getSuperDatasetIndexShardsFactor()
            : config.getIndexShardsNumber());
        // Set the index refresh period as INT(flushInterval * 2/3). At the edge case,
        // in low traffic(traffic < bulkActions in the whole period), there is a possible case, 2 period bulks are included in
        // one index refresh rebuild operation, which could cause version conflicts. And this case can't be fixed
        // through `core/persistentPeriod` as the bulk fresh is not controlled by the persistent timer anymore.
        int indexRefreshInterval = config.getFlushInterval() * 2 / 3;
        if (indexRefreshInterval < 5) {
            // The refresh interval should not be less than 5 seconds (the recommended default value = 10s),
            // and the bulk flush interval should not be set less than 8s (the recommended default value = 15s).
            // This is a precaution case which makes ElasticSearch server has reasonable refresh interval,
            // even this value is set too small by end user manually.
            indexRefreshInterval = 5;
        }
        setting.put("index.refresh_interval", indexRefreshInterval + "s");
        List<ModelColumn> columns = IndexController.LogicIndicesRegister.getPhysicalTableColumns(model);
        setting.put("analysis", getAnalyzerSetting(columns));
        if (!StringUtil.isEmpty(config.getAdvanced())) {
            Map<String, Object> advancedSettings = gson.fromJson(config.getAdvanced(), Map.class);
            setting.putAll(advancedSettings);
        }
        return setting;
    }

    private Map getAnalyzerSetting(List<ModelColumn> analyzerTypes) throws StorageException {
        AnalyzerSetting analyzerSetting = new AnalyzerSetting();
        for (final ModelColumn column : analyzerTypes) {
            if (!column.getElasticSearchExtension().needMatchQuery()) {
                continue;
            }
            AnalyzerSetting setting = AnalyzerSetting.Generator.getGenerator(
                                                         column.getElasticSearchExtension().getAnalyzer())
                                                               .getGenerateFunc()
                                                               .generate(config);
            analyzerSetting.combine(setting);
        }
        return gson.fromJson(gson.toJson(analyzerSetting), Map.class);
    }

    protected Mappings createMapping(Model model) {
        Map<String, Object> properties = new HashMap<>();
        Mappings.Source source = new Mappings.Source();
        for (ModelColumn columnDefine : model.getColumns()) {
            final String type = columnTypeEsMapping.transform(columnDefine.getType(), columnDefine.getGenericType());
            if (columnDefine.getElasticSearchExtension().needMatchQuery()) {
                String matchCName = MatchCNameBuilder.INSTANCE.build(columnDefine.getColumnName().getName());

                Map<String, Object> originalColumn = new HashMap<>();
                originalColumn.put("type", type);
                originalColumn.put("copy_to", matchCName);
                properties.put(columnDefine.getColumnName().getName(), originalColumn);

                Map<String, Object> matchColumn = new HashMap<>();
                matchColumn.put("type", "text");
                matchColumn.put("analyzer", columnDefine.getElasticSearchExtension().getAnalyzer().getName());
                properties.put(matchCName, matchColumn);
            } else {
                Map<String, Object> column = new HashMap<>();
                column.put("type", type);
                // no index parameter is allowed for binary type, since ES 8.0
                if (columnDefine.isStorageOnly() && !"binary".equals(type)) {
                    column.put("index", false);
                }
                properties.put(columnDefine.getColumnName().getName(), column);
            }

            if (columnDefine.isIndexOnly()) {
                source.getExcludes().add(columnDefine.getColumnName().getName());
            }
        }

        if ((IndexController.INSTANCE.isMetricModel(model) && !config.isLogicSharding())
            || (config.isLogicSharding() && IndexController.INSTANCE.isFunctionMetric(model))) {
            Map<String, Object> column = new HashMap<>();
            column.put("type", "keyword");
            properties.put(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, column);
        }
        if (!config.isLogicSharding() && IndexController.INSTANCE.isRecordModel(model) && !model.isSuperDataset()) {
            Map<String, Object> column = new HashMap<>();
            column.put("type", "keyword");
            properties.put(IndexController.LogicIndicesRegister.RECORD_TABLE_NAME, column);
        }
        Mappings mappings = Mappings.builder()
                                    .type("type")
                                    .properties(properties)
                                    .source(source)
                                    .build();
        log.debug("elasticsearch index template setting: {}", mappings.toString());

        return mappings;
    }
}
