/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.metron.elasticsearch.dao;

import static org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.lucene.search.join.ScoreMode;
import org.apache.metron.common.Constants;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig;
import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertRetrieveLatestDao;
import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
import org.apache.metron.indexing.dao.metaalert.MetaScores;
import org.apache.metron.indexing.dao.metaalert.lucene.AbstractLuceneMetaAlertUpdateDao;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.InvalidCreateException;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.search.SearchResult;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;

public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpdateDao {

  private static final String INDEX_NOT_FOUND_INDICES_KEY = "es.index";

  private ElasticsearchDao elasticsearchDao;
  private MetaAlertRetrieveLatestDao retrieveLatestDao;
  private int pageSize;

  /**
   * Constructor an ElasticsearchMetaAlertUpdateDao
   * @param elasticsearchDao An UpdateDao to defer queries to.
   * @param retrieveLatestDao A RetrieveLatestDao for getting the current state of items being
   *     mutated.
   * @param config The meta alert config to use.
   */
  public ElasticsearchMetaAlertUpdateDao(
      ElasticsearchDao elasticsearchDao,
      MetaAlertRetrieveLatestDao retrieveLatestDao,
      MetaAlertConfig config,
      int pageSize
  ) {
    super(elasticsearchDao, retrieveLatestDao, config);
    this.elasticsearchDao = elasticsearchDao;
    this.retrieveLatestDao = retrieveLatestDao;
    this.pageSize = pageSize;
  }

  @Override
  @SuppressWarnings("unchecked")
  public Document createMetaAlert(MetaAlertCreateRequest request)
      throws InvalidCreateException, IOException {
    List<GetRequest> alertRequests = request.getAlerts();
    if (request.getAlerts().isEmpty()) {
      throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts");
    }
    if (request.getGroups().isEmpty()) {
      throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups");
    }

    // Retrieve the documents going into the meta alert and build it
    Iterable<Document> alerts = retrieveLatestDao.getAllLatest(alertRequests);

    Document metaAlert = buildCreateDocument(alerts, request.getGroups(),
        MetaAlertConstants.ALERT_FIELD);
    MetaScores
        .calculateMetaScores(metaAlert, getConfig().getThreatTriageField(),
            getConfig().getThreatSort());
    // Add source type to be consistent with other sources and allow filtering
    metaAlert.getDocument()
        .put(getConfig().getSourceTypeField(), MetaAlertConstants.METAALERT_TYPE);

    // Start a list of updates / inserts we need to run
    Map<Document, Optional<String>> updates = new HashMap<>();
    updates.put(metaAlert, Optional.of(getConfig().getMetaAlertIndex()));

    try {
      // We need to update the associated alerts with the new meta alerts, making sure existing
      // links are maintained.
      Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap(
          GetRequest::getGuid, GetRequest::getIndex));
      Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap(
          GetRequest::getGuid, GetRequest::getSensorType));
      for (Document alert : alerts) {
        if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
          // Use the index in the request if it exists
          Optional<String> index = guidToIndices.get(alert.getGuid());
          if (!index.isPresent()) {
            // Look up the index from Elasticsearch if one is not supplied in the request
            index = elasticsearchDao
                .getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid()));
            if (!index.isPresent()) {
              throw new IllegalArgumentException("Could not find index for " + alert.getGuid());
            }
          }
          updates.put(alert, index);
        }
      }

      // Kick off any updates.
      update(updates);

      return metaAlert;
    } catch (IOException ioe) {
      throw new InvalidCreateException("Unable to create meta alert", ioe);
    }
  }

  @Override
  public Document update(Document update, Optional<String> index) throws IOException {
    if (MetaAlertConstants.METAALERT_TYPE.equals(update.getSensorType())) {
      // We've been passed an update to the meta alert.
      throw new UnsupportedOperationException("Meta alerts cannot be directly updated");
    } else {
      Map<Document, Optional<String>> updates = new HashMap<>();
      updates.put(update, index);
      try {
        // We need to update an alert itself.  Only that portion of the update can be delegated.
        // We still need to get meta alerts potentially associated with it and update.
        SearchResponse response = getMetaAlertsForAlert(update.getGuid());
        Collection<Document> metaAlerts = response
                .getResults()
                .stream()
                .map(result -> toDocument(result, update.getTimestamp()))
                .collect(Collectors.toList());
        // Each meta alert needs to be updated with the new alert
        for (Document metaAlert : metaAlerts) {
          replaceAlertInMetaAlert(metaAlert, update);
          updates.put(metaAlert, Optional.of(METAALERTS_INDEX));
        }
      } catch (IndexNotFoundException e) {
        List<String> indicesNotFound = e.getMetadata(INDEX_NOT_FOUND_INDICES_KEY);
        // If no metaalerts have been created yet and the metaalerts index does not exist, assume no metaalerts exist for alert.
        // Otherwise throw the exception.
        if (indicesNotFound.size() != 1 || !METAALERTS_INDEX.equals(indicesNotFound.get(0))) {
          throw e;
        }
      }

      // Run the alert's update
      elasticsearchDao.batchUpdate(updates);

      return update;
    }
  }

  private Document toDocument(SearchResult result, Long timestamp) {
    Document document = Document.fromJSON(result.getSource());
    document.setTimestamp(timestamp);
    document.setDocumentID(result.getId());
    return document;
  }

  @Override
  public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException {
    return getUpdateDao().addCommentToAlert(request);
  }

  @Override
  public Document removeCommentFromAlert(CommentAddRemoveRequest request) throws IOException {
    return getUpdateDao().removeCommentFromAlert(request);
  }

  @Override
  public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest)
      throws IOException {
    return getUpdateDao().addCommentToAlert(request, latest);
  }

  @Override
  public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest)
      throws IOException {
    return getUpdateDao().removeCommentFromAlert(request, latest);
  }

  /**
   * Given an alert GUID, retrieve all associated meta alerts.
   * @param alertGuid The GUID of the child alert
   * @return The Elasticsearch response containing the meta alerts
   */
  protected SearchResponse getMetaAlertsForAlert(String alertGuid) throws IOException {
    QueryBuilder qb = boolQuery()
        .must(
            nestedQuery(
                MetaAlertConstants.ALERT_FIELD,
                boolQuery()
                    .must(termQuery(MetaAlertConstants.ALERT_FIELD + "." + Constants.GUID,
                        alertGuid)),
                ScoreMode.None
            ).innerHit(new InnerHitBuilder())
        )
        .must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
    return ElasticsearchUtils
        .queryAllResults(elasticsearchDao.getClient().getHighLevelClient(), qb, getConfig().getMetaAlertIndex(),
            pageSize);
  }


  protected void replaceAlertInMetaAlert(Document metaAlert, Document alert) {
    boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert,
        Collections.singleton(alert.getGuid()));
    if (metaAlertUpdated) {
      addAlertsToMetaAlert(metaAlert, Collections.singleton(alert));
    }
  }
}
