/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.metron.indexing.dao.metaalert.lucene;

import static org.apache.metron.common.Constants.GUID;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.metron.common.Constants;
import org.apache.metron.indexing.dao.RetrieveLatestDao;
import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig;
import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants;
import org.apache.metron.indexing.dao.metaalert.MetaAlertRetrieveLatestDao;
import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
import org.apache.metron.indexing.dao.metaalert.MetaAlertUpdateDao;
import org.apache.metron.indexing.dao.metaalert.MetaScores;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
import org.apache.metron.indexing.dao.update.PatchRequest;
import org.apache.metron.indexing.dao.update.UpdateDao;

public abstract class AbstractLuceneMetaAlertUpdateDao implements MetaAlertUpdateDao {

  private UpdateDao updateDao;
  private MetaAlertRetrieveLatestDao retrieveLatestDao;
  private MetaAlertConfig config;

  protected AbstractLuceneMetaAlertUpdateDao(
      UpdateDao updateDao,
      MetaAlertRetrieveLatestDao retrieveLatestDao,
      MetaAlertConfig config) {
    this.updateDao = updateDao;
    this.retrieveLatestDao = retrieveLatestDao;
    this.config = config;
  }

  public UpdateDao getUpdateDao() {
    return updateDao;
  }

  public MetaAlertRetrieveLatestDao getRetrieveLatestDao() {
    return retrieveLatestDao;
  }

  public MetaAlertConfig getConfig() {
    return config;
  }

  /**
   * Performs a patch operation on a document based on the result of @{link #isPatchAllowed(PatchRequest)}
   *
   * @param retrieveLatestDao DAO to retrieve the item to be patched
   * @param request The patch request.
   * @param timestamp Optionally a timestamp to set. If not specified then current time is used.
   * @return The patched document
   * @throws OriginalNotFoundException If no original document is found to patch.
   * @throws IOException If an error occurs performing the patch.
   */
  @Override
  public Document patch(RetrieveLatestDao retrieveLatestDao, PatchRequest request,
      Optional<Long> timestamp)
      throws OriginalNotFoundException, IOException {
    if (isPatchAllowed(request)) {
      return updateDao.patch(retrieveLatestDao, request, timestamp);
    } else {
      throw new IllegalArgumentException(
          "Meta alert patches are not allowed for /alert or /status paths.  "
              + "Please use the add/remove alert or update status functions instead.");
    }
  }

  @Override
  public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) {
    throw new UnsupportedOperationException("Meta alerts do not allow for bulk updates");
  }

  /**
   * Build the Document representing a meta alert to be created.
   * @param alerts The Elasticsearch results for the meta alerts child documents
   * @param groups The groups used to create this meta alert
   * @return A Document representing the new meta alert
   */
  protected Document buildCreateDocument(Iterable<Document> alerts, List<String> groups,
      String alertField) {
    // Need to create a Document from the multiget. Scores will be calculated later
    Map<String, Object> metaSource = new HashMap<>();
    List<Map<String, Object>> alertList = new ArrayList<>();
    for (Document alert : alerts) {
      alertList.add(alert.getDocument());
    }
    metaSource.put(alertField, alertList);

    // Add any meta fields
    String guid = UUID.randomUUID().toString();
    metaSource.put(GUID, guid);
    metaSource.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
    metaSource.put(MetaAlertConstants.GROUPS_FIELD, groups);
    metaSource.put(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());

    return new Document(metaSource, guid, MetaAlertConstants.METAALERT_TYPE,
        System.currentTimeMillis());
  }

  /**
   * Builds the set of updates when alerts are removed from a meta alert
   * @param metaAlert The meta alert to remove alerts from
   * @param alerts The alert Documents to be removed
   * @return The updates to be run
   * @throws IOException If an error is thrown.
   */
  @SuppressWarnings("unchecked")
  protected Map<Document, Optional<String>> buildRemoveAlertsFromMetaAlert(Document metaAlert,
      Iterable<Document> alerts)
      throws IOException {
    Map<Document, Optional<String>> updates = new HashMap<>();

    List<String> alertGuids = new ArrayList<>();
    for (Document alert : alerts) {
      alertGuids.add(alert.getGuid());
    }
    List<Map<String, Object>> alertsBefore = new ArrayList<>();
    Map<String, Object> documentBefore = metaAlert.getDocument();
    if (documentBefore.containsKey(MetaAlertConstants.ALERT_FIELD)) {
      alertsBefore
          .addAll((List<Map<String, Object>>) documentBefore.get(MetaAlertConstants.ALERT_FIELD));
    }
    boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, alertGuids);
    if (metaAlertUpdated) {
      List<Map<String, Object>> alertsAfter = (List<Map<String, Object>>) metaAlert.getDocument()
          .get(MetaAlertConstants.ALERT_FIELD);
      if (alertsAfter.size() < alertsBefore.size() && alertsAfter.size() == 0) {
        throw new IllegalStateException("Removing these alerts will result in an empty meta alert.  Empty meta alerts are not allowed.");
      }
      MetaScores
          .calculateMetaScores(metaAlert, config.getThreatTriageField(), config.getThreatSort());
      updates.put(metaAlert, Optional.of(config.getMetaAlertIndex()));
      for (Document alert : alerts) {
        if (removeMetaAlertFromAlert(metaAlert.getGuid(), alert)) {
          updates.put(alert, Optional.empty());
        }
      }
    }
    return updates;
  }

  /**
   * Adds alerts to a metaalert, based on a list of GetRequests provided for retrieval.
   * @param metaAlertGuid The GUID of the metaalert to be given new children.
   * @param alertRequests GetRequests for the appropriate alerts to add.
   * @return The updated metaalert with alerts added.
   */
  @Override
  public Document addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
          throws IOException {
    Document metaAlert = retrieveLatestDao
            .getLatest(metaAlertGuid, MetaAlertConstants.METAALERT_TYPE);
    if (metaAlert == null) {
      throw new IOException(String.format("Unable to add alerts to meta alert.  Meta alert with guid %s cannot be found.",
              metaAlertGuid));
    }
    if (MetaAlertStatus.ACTIVE.getStatusString()
            .equals(metaAlert.getDocument().get(MetaAlertConstants.STATUS_FIELD))) {
      Iterable<Document> alerts = retrieveLatestDao.getAllLatest(alertRequests);
      Set<String> missingAlerts = getMissingAlerts(alertRequests, alerts);
      if (!missingAlerts.isEmpty()) {
        throw new IOException(String.format("Unable to add alerts to meta alert.  Alert with guid %s cannot be found.",
                missingAlerts.iterator().next()));
      }
      Map<Document, Optional<String>> updates = buildAddAlertToMetaAlertUpdates(metaAlert, alerts);
      update(updates);
      return metaAlert;
    } else {
      throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed");
    }
  }

  /**
   * Removes alerts from a metaalert, based on a list of GetRequests provided for retrieval.
   * @param metaAlertGuid The GUID of the metaalert to remove children from.
   * @param alertRequests A list of GetReqests that will provide the alerts to remove
   * @return The updated metaalert with alerts removed.
   * @throws IllegalStateException If the metaalert is inactive.
   */
  @Override
  @SuppressWarnings("unchecked")
  public Document removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
      throws IOException, IllegalStateException {
    Document metaAlert = retrieveLatestDao
        .getLatest(metaAlertGuid, MetaAlertConstants.METAALERT_TYPE);
    if (metaAlert == null) {
      throw new IOException(String.format("Unable to remove alerts from meta alert.  Meta alert with guid %s cannot be found.",
              metaAlertGuid));
    }
    if (MetaAlertStatus.ACTIVE.getStatusString()
        .equals(metaAlert.getDocument().get(MetaAlertConstants.STATUS_FIELD))) {
      Iterable<Document> alerts = retrieveLatestDao.getAllLatest(alertRequests);
      Set<String> missingAlerts = getMissingAlerts(alertRequests, alerts);
      if (!missingAlerts.isEmpty()) {
        throw new IOException(String.format("Unable to remove alerts from meta alert.  Alert with guid %s cannot be found.",
                missingAlerts.iterator().next()));
      }
      Map<Document, Optional<String>> updates = buildRemoveAlertsFromMetaAlert(metaAlert, alerts);
      update(updates);
      return metaAlert;
    } else {
      throw new IllegalStateException("Removing alerts from an INACTIVE meta alert is not allowed");
    }
  }

  /**
   * Removes a given set of alerts from a given alert. AlertGuids that are not found are ignored.
   * @param metaAlert The metaalert to be mutated.
   * @param alertGuids The alerts to remove from the metaaelrt.
   * @return True if the metaAlert changed, false otherwise.
   */
  protected boolean removeAlertsFromMetaAlert(Document metaAlert, Collection<String> alertGuids) {
    // If we don't have child alerts or nothing is being removed, immediately return false.
    if (!metaAlert.getDocument().containsKey(MetaAlertConstants.ALERT_FIELD)
        || alertGuids.size() == 0) {
      return false;
    }

    @SuppressWarnings("unchecked")
    List<Map<String, Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument()
        .get(MetaAlertConstants.ALERT_FIELD);
    int previousSize = currentAlerts.size();
    // Only remove an alert if it is in the meta alert
    currentAlerts.removeIf(currentAlert -> alertGuids.contains(currentAlert.get(GUID)));
    return currentAlerts.size() != previousSize;
  }

  @Override
  public Document updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status)
      throws IOException {
    Document metaAlert = retrieveLatestDao
        .getLatest(metaAlertGuid, MetaAlertConstants.METAALERT_TYPE);
    if (metaAlert == null) {
      throw new IOException(String.format("Unable to update meta alert status.  Meta alert with guid %s cannot be found.",
              metaAlertGuid));
    }
    String currentStatus = (String) metaAlert.getDocument().get(MetaAlertConstants.STATUS_FIELD);
    boolean metaAlertUpdated = !status.getStatusString().equals(currentStatus);
    if (metaAlertUpdated) {
      List<GetRequest> getRequests = new ArrayList<>();
      @SuppressWarnings("unchecked")
      List<Map<String, Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument()
          .get(MetaAlertConstants.ALERT_FIELD);
      currentAlerts.stream()
          .forEach(currentAlert -> getRequests.add(new GetRequest((String) currentAlert.get(GUID),
              (String) currentAlert.get(config.getSourceTypeField()))));
      Iterable<Document> alerts = retrieveLatestDao.getAllLatest(getRequests);
      Map<Document, Optional<String>> updates = buildStatusChangeUpdates(metaAlert, alerts, status);
      update(updates);
    }
    return metaAlert;
  }

  /**
   * Given a Metaalert and a status change, builds the set of updates to be run.
   * @param metaAlert The metaalert to have status changed
   * @param alerts The alerts to change status for
   * @param status The status to change to
   * @return The updates to be run
   */
  protected Map<Document, Optional<String>> buildStatusChangeUpdates(Document metaAlert,
      Iterable<Document> alerts,
      MetaAlertStatus status) {
    metaAlert.getDocument().put(MetaAlertConstants.STATUS_FIELD, status.getStatusString());

    Map<Document, Optional<String>> updates = new HashMap<>();
    updates.put(metaAlert, Optional.of(config.getMetaAlertIndex()));

    for (Document alert : alerts) {
      boolean metaAlertAdded = false;
      boolean metaAlertRemoved = false;
      // If we're making it active add add the meta alert guid for every alert.
      if (MetaAlertStatus.ACTIVE.equals(status)) {
        metaAlertAdded = addMetaAlertToAlert(metaAlert.getGuid(), alert);
      }
      // If we're making it inactive, remove the meta alert guid from every alert.
      if (MetaAlertStatus.INACTIVE.equals(status)) {
        metaAlertRemoved = removeMetaAlertFromAlert(metaAlert.getGuid(), alert);
      }
      if (metaAlertAdded || metaAlertRemoved) {
        updates.put(alert, Optional.empty());
      }
    }
    return updates;
  }

  /**
   * Builds the updates to be run based on a given metaalert and a set of new alerts for the it.
   * @param metaAlert The base metaalert we're building updates for
   * @param alerts The alerts being added
   * @return The set of resulting updates.
   */
  protected Map<Document, Optional<String>> buildAddAlertToMetaAlertUpdates(Document metaAlert,
      Iterable<Document> alerts) {
    Map<Document, Optional<String>> updates = new HashMap<>();
    boolean metaAlertUpdated = addAlertsToMetaAlert(metaAlert, alerts);
    if (metaAlertUpdated) {
      MetaScores
          .calculateMetaScores(metaAlert, config.getThreatTriageField(), config.getThreatSort());
      updates.put(metaAlert, Optional.of(config.getMetaAlertIndex()));
      for (Document alert : alerts) {
        if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
          updates.put(alert, Optional.empty());
        }
      }
    }
    return updates;
  }

  /**
   * Adds the provided alerts to a given metaalert.
   * @param metaAlert The metaalert to be given new children.
   * @param alerts The alerts to be added as children
   * @return True if metaalert is modified, false otherwise.
   */
  protected boolean addAlertsToMetaAlert(Document metaAlert, Iterable<Document> alerts) {
    boolean alertAdded = false;
    @SuppressWarnings("unchecked")
    List<Map<String, Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument()
        .get(MetaAlertConstants.ALERT_FIELD);
    if (currentAlerts == null) {
      currentAlerts = new ArrayList<>();
      metaAlert.getDocument().put(MetaAlertConstants.ALERT_FIELD, currentAlerts);
    }
    Set<String> currentAlertGuids = currentAlerts.stream().map(currentAlert ->
        (String) currentAlert.get(GUID)).collect(Collectors.toSet());
    for (Document alert : alerts) {
      String alertGuid = alert.getGuid();
      // Only add an alert if it isn't already in the meta alert
      if (!currentAlertGuids.contains(alertGuid)) {
        currentAlerts.add(alert.getDocument());
        alertAdded = true;
      }
    }
    return alertAdded;
  }

  /**
   * Calls the single update variant if there's only one update, otherwise calls batch.
   * MetaAlerts may defer to an implementation specific IndexDao.
   * @param updates The list of updates to run
   * @throws IOException If there's an update error
   */
  protected void update(Map<Document, Optional<String>> updates)
      throws IOException {
    if (updates.size() == 1) {
      Entry<Document, Optional<String>> singleUpdate = updates.entrySet().iterator().next();
      updateDao.update(singleUpdate.getKey(), singleUpdate.getValue());
    } else if (updates.size() > 1) {
      updateDao.batchUpdate(updates);
    } // else we have no updates, so don't do anything
  }

  protected Set<String> getMissingAlerts(List<GetRequest> alertRequests, Iterable<Document> results) {
    Set<String> requestGuids = alertRequests.stream().map(GetRequest::getGuid).collect(Collectors.toSet());
    Set<String> resultGuids = StreamSupport.stream(results.spliterator(), false)
            .map(Document::getGuid).collect(Collectors.toSet());
    Set<String> missingGuids = new HashSet<>(requestGuids);
    missingGuids.removeAll(resultGuids);
    return missingGuids;
  }

}
