/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.metamodel.elasticsearch.nativeclient;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.metamodel.DataContext;
import org.apache.metamodel.UpdateScript;
import org.apache.metamodel.UpdateSummary;
import org.apache.metamodel.data.DataSet;
import org.apache.metamodel.data.DataSetHeader;
import org.apache.metamodel.data.Row;
import org.apache.metamodel.data.SimpleDataSetHeader;
import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.LogicalOperator;
import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.util.SimpleTableDef;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.carrotsearch.hppc.ObjectLookupContainer;
import com.carrotsearch.hppc.cursors.ObjectCursor;

/**
 * DataContext implementation for ElasticSearch analytics engine.
 *
 * ElasticSearch has a data storage structure hierarchy that briefly goes like
 * this:
 * <ul>
 * <li>Index</li>
 * <li>Document type (short: Type) (within an index)</li>
 * <li>Documents (of a particular type)</li>
 * </ul>
 *
 * When instantiating this DataContext, an index name is provided. Within this
 * index, each document type is represented as a table.
 *
 * This implementation supports either automatic discovery of a schema or manual
 * specification of a schema, through the {@link SimpleTableDef} class.
 */
public class ElasticSearchDataContext extends AbstractElasticSearchDataContext {

    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class);

    private final Client elasticSearchClient;

    /**
     * Constructs a {@link ElasticSearchDataContext}. This constructor accepts a
     * custom array of {@link SimpleTableDef}s which allows the user to define
     * his own view on the indexes in the engine.
     *
     * @param client
     *            the ElasticSearch client
     * @param indexName
     *            the name of the ElasticSearch index to represent
     * @param tableDefinitions
     *            an array of {@link SimpleTableDef}s, which define the table
     *            and column model of the ElasticSearch index.
     */
    public ElasticSearchDataContext(Client client, String indexName, SimpleTableDef... tableDefinitions) {
        super(indexName, tableDefinitions);

        if (client == null) {
            throw new IllegalArgumentException("ElasticSearch Client cannot be null");
        }
        this.elasticSearchClient = client;
        this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
    }

    /**
     * Constructs a {@link ElasticSearchDataContext} and automatically detects
     * the schema structure/view on all indexes (see
     * {@link #detectTable(ClusterState, String, String)}).
     *
     * @param client
     *            the ElasticSearch client
     * @param indexName
     *            the name of the ElasticSearch index to represent
     */
    public ElasticSearchDataContext(Client client, String indexName) {
        this(client, indexName, new SimpleTableDef[0]);
    }

    @Override
    protected SimpleTableDef[] detectSchema() {
        logger.info("Detecting schema for index '{}'", indexName);

        final ClusterStateRequestBuilder clusterStateRequestBuilder = getElasticSearchClient().admin().cluster()
                .prepareState().setIndices(indexName);
        final ClusterState cs = clusterStateRequestBuilder.execute().actionGet().getState();
        
        final List<SimpleTableDef> result = new ArrayList<>();

        final IndexMetaData imd = cs.getMetaData().index(indexName);
        if (imd == null) {
            // index does not exist
            logger.warn("No metadata returned for index name '{}' - no tables will be detected.");
        } else {
            final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
            final ObjectLookupContainer<String> documentTypes = mappings.keys();
            for (final ObjectCursor<?> documentTypeCursor : documentTypes) {
                final String documentType = documentTypeCursor.value.toString();
                try {
                    final SimpleTableDef table = detectTable(cs, indexName, documentType);
                    result.add(table);
                } catch (Exception e) {
                    logger.error("Unexpected error during detectTable for document type '{}'", documentType, e);
                }
            }
        }
        return sortTables(result);
    }

    /**
     * Performs an analysis of an available index type in an ElasticSearch
     * {@link Client} client and tries to detect the index structure based on
     * the metadata provided by the java client.
     *
     * @param cs
     *            the ElasticSearch cluster
     * @param indexName
     *            the name of the index
     * @param documentType
     *            the name of the index type
     * @return a table definition for ElasticSearch.
     */
    public static SimpleTableDef detectTable(ClusterState cs, String indexName, String documentType) throws Exception {
        logger.debug("Detecting table for document type '{}' in index '{}'", documentType, indexName);
        final IndexMetaData imd = cs.getMetaData().index(indexName);
        if (imd == null) {
            // index does not exist
            throw new IllegalArgumentException("No such index: " + indexName);
        }
        final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
        final MappingMetaData mappingMetaData = mappings.get(documentType);
        if (mappingMetaData == null) {
            throw new IllegalArgumentException("No such document type in index '" + indexName + "': " + documentType);
        }
        final Map<String, Object> mp = mappingMetaData.getSourceAsMap();
        final Object metadataProperties = mp.get("properties");
        if (metadataProperties != null && metadataProperties instanceof Map) {
            @SuppressWarnings("unchecked")
            final Map<String, ?> metadataPropertiesMap = (Map<String, ?>) metadataProperties;
            final ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metadataPropertiesMap);
            final SimpleTableDef std = new SimpleTableDef(documentType, metaData.getColumnNames(),
                    metaData.getColumnTypes());
            return std;
        }
        throw new IllegalArgumentException("No mapping properties defined for document type '" + documentType
                + "' in index: " + indexName);
    }

    @Override
    protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
            List<FilterItem> whereItems, int firstRow, int maxRows) {
        final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
                LogicalOperator.AND);
        if (queryBuilder != null) {
            // where clause can be pushed down to an ElasticSearch query
            final SearchRequestBuilder searchRequest = createSearchRequest(table, firstRow, maxRows, queryBuilder);
            final SearchResponse response = searchRequest.execute().actionGet();
            return new ElasticSearchDataSet(getElasticSearchClient(), response, selectItems);
        }
        return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
    }

    @Override
    protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
        final SearchRequestBuilder searchRequest = createSearchRequest(table, 1, maxRows, null);
        final SearchResponse response = searchRequest.execute().actionGet();
        return new ElasticSearchDataSet(getElasticSearchClient(), response, columns.stream().map(SelectItem::new)
                .collect(Collectors.toList()));
    }

    private SearchRequestBuilder createSearchRequest(Table table, int firstRow, int maxRows, QueryBuilder queryBuilder) {
        final String documentType = table.getName();
        final SearchRequestBuilder searchRequest = getElasticSearchClient().prepareSearch(indexName).setTypes(documentType);
        if (firstRow > 1) {
            final int zeroBasedFrom = firstRow - 1;
            searchRequest.setFrom(zeroBasedFrom);
        }
        if (limitMaxRowsIsSet(maxRows)) {
            searchRequest.setSize(maxRows);
        } else {
            searchRequest.setScroll(TIMEOUT_SCROLL);
        }

        if (queryBuilder != null) {
            searchRequest.setQuery(queryBuilder);
        }

        return searchRequest;
    }

    @Override
    protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Column primaryKeyColumn,
            Object keyValue) {
        if (keyValue == null) {
            return null;
        }

        final String documentType = table.getName();
        final String id = keyValue.toString();

        final GetResponse response = getElasticSearchClient().prepareGet(indexName, documentType, id).execute().actionGet();

        if (!response.isExists()) {
            return null;
        }

        final Map<String, Object> source = response.getSource();
        final String documentId = response.getId();

        final DataSetHeader header = new SimpleDataSetHeader(selectItems);

        return ElasticSearchUtils.createRow(source, documentId, header);
    }

    @Override
    protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) {
        if (!whereItems.isEmpty()) {
            // not supported - will have to be done by counting client-side
            return null;
        }
        final String documentType = table.getName();
        final TermQueryBuilder query = QueryBuilders.termQuery("_type", documentType);
        final SearchResponse searchResponse =
                getElasticSearchClient().prepareSearch(indexName).setSource(new SearchSourceBuilder().size(0).query(query))
                        .execute().actionGet();
        return searchResponse.getHits().getTotalHits();
    }

    @Override
    public UpdateSummary executeUpdate(UpdateScript update) {
        final ElasticSearchUpdateCallback callback = new ElasticSearchUpdateCallback(this);
        update.run(callback);
        callback.onExecuteUpdateFinished();
        return callback.getUpdateSummary();
    }

    @Override
    protected void onSchemaCacheRefreshed() {
        getElasticSearchClient().admin().indices().prepareRefresh(indexName).get();
        
        detectSchema();
    }

    /**
     * Gets the {@link Client} that this {@link DataContext} is wrapping.
     *
     * @return
     */
    public Client getElasticSearchClient() {
        return elasticSearchClient;
    }
}
