/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.metron.indexing.dao;

import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.metron.indexing.dao.search.FieldType;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.GroupRequest;
import org.apache.metron.indexing.dao.search.GroupResponse;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

public class MultiIndexDao implements IndexDao {

  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
  private List<IndexDao> indices;

  public MultiIndexDao(IndexDao... composedDao) {
    indices = new ArrayList<>();
    Collections.addAll(indices, composedDao);
  }

  public MultiIndexDao(Iterable<IndexDao> composedDao) {
    this.indices = new ArrayList<>();
    Iterables.addAll(indices, composedDao);
  }

  public MultiIndexDao(Iterable<IndexDao> composedDao, Function<IndexDao, IndexDao> decoratorTransformation) {
    this(Iterables.transform(composedDao, x -> decoratorTransformation.apply(x)));
  }

  @Override
  public Document update(final Document update, Optional<String> index) throws IOException {
    List<String> exceptions =
    indices.parallelStream().map(dao -> {
      try {
        dao.update(update, index);
        return null;
      } catch (Throwable e) {
        return dao.getClass() + ": " + e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e);
      }
    }).filter(e -> e != null).collect(Collectors.toList());
    if(exceptions.size() > 0) {
      throw new IOException(Joiner.on("\n").join(exceptions));
    }
    return update;
  }

  @Override
  public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
    List<String> exceptions =
        indices.parallelStream().map(dao -> {
          try {
            dao.batchUpdate(updates);
            return null;
          } catch (Throwable e) {
            return dao.getClass() + ": " + e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e);
          }
        }).filter(e -> e != null).collect(Collectors.toList());
    if (exceptions.size() > 0) {
      throw new IOException(Joiner.on("\n").join(exceptions));
    }
    return updates;
  }

  @Override
  public Map<String, FieldType> getColumnMetadata(List<String> in) throws IOException {
    for(IndexDao dao : indices) {
      Map<String, FieldType> r = dao.getColumnMetadata(in);
      if(r != null) {
        return r;
      }
    }
    return null;
  }

  @Override
  public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException {
    Document latest = getLatest(request.getGuid(), request.getSensorType());
    return addCommentToAlert(request, latest);
  }

  /**
   * Adds comments to an alert.  Updates are written to each Dao in parallel with the assumption that all updates
   * are identical.  The first update to be applied is returned as the current version of the alert with comments added.
   * @param request Request to add comments
   * @param latest The latest version of the alert the comments will be added to.
   * @return The complete alert document with comments added.
   * @throws IOException
   */
  @Override
  public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
    List<DocumentContainer> output = indices
            .parallelStream()
            .map(dao -> addCommentToAlert(dao, request, latest))
            .collect(Collectors.toList());
    return getLatestDocument(output);
  }

  private DocumentContainer addCommentToAlert(IndexDao indexDao, CommentAddRemoveRequest request, Document latest) {
    DocumentContainer container;
    try {
      Document document = indexDao.addCommentToAlert(request, latest);
      container = new DocumentContainer(document);
      LOG.debug("Added comment to alert; indexDao={}, guid={}, sensorType={}, document={}",
              ClassUtils.getShortClassName(indexDao.getClass()), document.getGuid(), document.getSensorType(), document);

    } catch (Throwable e) {
      container = new DocumentContainer(e);
      LOG.error("Unable to add comment to alert; indexDao={}, error={}",
              ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e));
    }

    return container;
  }

  @Override
  public Document removeCommentFromAlert(CommentAddRemoveRequest request) throws IOException {
    Document latest = getLatest(request.getGuid(), request.getSensorType());
    return removeCommentFromAlert(request, latest);
  }

  /**
   * Removes comments from an alert.  Updates are written to each Dao in parallel with the assumption that all updates
   * are identical.  The first update to be applied is returned as the current version of the alert with comments removed.
   * @param request Request to remove comments
   * @param latest The latest version of the alert the comments will be removed from.
   * @return The complete alert document with comments removed.
   * @throws IOException
   */
  @Override
  public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
    List<DocumentContainer> output = indices
            .parallelStream()
            .map(dao -> removeCommentFromAlert(dao, request, latest))
            .collect(Collectors.toList());
    return getLatestDocument(output);
  }

  private DocumentContainer removeCommentFromAlert(IndexDao indexDao, CommentAddRemoveRequest request, Document latest) {
    DocumentContainer container;
    try {
      Document document = indexDao.removeCommentFromAlert(request, latest);
      container = new DocumentContainer(document);
      LOG.debug("Removed comment from alert; indexDao={}, guid={}, sensorType={}, document={}",
              ClassUtils.getShortClassName(indexDao.getClass()), document.getGuid(), document.getSensorType(), document);

    } catch (Throwable e) {
      container = new DocumentContainer(e);
      LOG.error("Unable to remove comment from alert; indexDao={}, error={}",
              ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e));
    }

    return container;
  }

  protected static class DocumentContainer {
    private Optional<Document> d = Optional.empty();
    private Optional<Throwable> t = Optional.empty();
    public DocumentContainer(Document d) {
      this.d = Optional.ofNullable(d);
    }
    public DocumentContainer(Throwable t) {
      this.t = Optional.ofNullable(t);
    }

    public Optional<Document> getDocument() {
      return d;
    }
    public Optional<Throwable> getException() {
      return t;
    }

  }

  private static class DocumentIterableContainer {
    private Optional<Iterable<Document>> d = Optional.empty();
    private Optional<Throwable> t = Optional.empty();
    public DocumentIterableContainer(Iterable<Document> d) {
      this.d = Optional.ofNullable(d);
    }
    public DocumentIterableContainer(Throwable t) {
      this.t = Optional.ofNullable(t);
    }

    public Optional<Iterable<Document>> getDocumentIterable() {
      return d;
    }
    public Optional<Throwable> getException() {
      return t;
    }

  }

  @Override
  public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
    for(IndexDao dao : indices) {
      SearchResponse s = dao.search(searchRequest);
      if(s != null) {
        return s;
      }
    }
    return null;
  }

  @Override
  public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
    for(IndexDao dao : indices) {
      GroupResponse s = dao.group(groupRequest);
      if(s != null) {
        return s;
      }
    }
    return null;
  }

  @Override
  public void init(AccessConfig config) {
    for(IndexDao dao : indices) {
      dao.init(config);
    }
  }

  @Override
  public Document getLatest(final String guid, String sensorType) throws IOException {
    List<DocumentContainer> output = indices
            .parallelStream()
            .map(dao -> getLatest(dao, guid, sensorType))
            .collect(Collectors.toList());
    return getLatestDocument(output);
  }

  private DocumentContainer getLatest(IndexDao indexDao, String guid, String sensorType) {
    DocumentContainer container;
    try {
      Document document = indexDao.getLatest(guid, sensorType);
      container = new DocumentContainer(document);
      LOG.debug("Found latest document; indexDao={}, guid={}, sensorType={}, document={}",
              ClassUtils.getShortClassName(indexDao.getClass()), guid, sensorType, document);

    } catch (Throwable e) {
      container = new DocumentContainer(e);
      LOG.error("Unable to find latest document; indexDao={}, error={}",
              ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e));
    }

    return container;
  }

  @Override
  public Iterable<Document> getAllLatest(
      List<GetRequest> getRequests) throws IOException {
    Iterable<Document> ret = null;
    List<DocumentIterableContainer> output =
        indices.parallelStream().map(dao -> {
          try {
            return new DocumentIterableContainer(dao.getAllLatest(getRequests));
          } catch (Throwable e) {
            return new DocumentIterableContainer(e);
          }
        }).collect(Collectors.toList());

    List<String> error = new ArrayList<>();
    for(DocumentIterableContainer dc : output) {
      if(dc.getException().isPresent()) {
        Throwable e = dc.getException().get();
        error.add(e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e));
      }
      else {
        if(dc.getDocumentIterable().isPresent()) {
          Iterable<Document> documents = dc.getDocumentIterable().get();
          if(ret == null) {
            ret = documents;
          }
        }
      }
    }
    if(error.size() > 0) {
      throw new IOException(Joiner.on("\n").join(error));
    }
    return ret;
  }

  public List<IndexDao> getIndices() {
    return indices;
  }

  /**
   * Returns the most recent {@link Document} from a list of {@link DocumentContainer}s.
   *
   * @param documentContainers A list of containers; each retrieved from a separate index.
   * @return The latest {@link Document} found.
   * @throws IOException If any of the {@link DocumentContainer}s contain an exception.
   */
  private Document getLatestDocument(List<DocumentContainer> documentContainers) throws IOException {
    Document latestDocument = null;
    List<String> error = new ArrayList<>();

    for(DocumentContainer dc : documentContainers) {
      if(dc.getException().isPresent()) {
        // collect each exception; multiple can occur, one in each index
        Throwable e = dc.getException().get();
        error.add(e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e));

      } else if(dc.getDocument().isPresent()) {
        Document d = dc.getDocument().get();
        // is this the latest document so far?
        if(latestDocument == null || latestDocument.getTimestamp() < d.getTimestamp()) {
          latestDocument = d;
        }

      } else {
        // no document was found in the index
      }
    }
    if(error.size() > 0) {
      // report all of the errors encountered
      throw new IOException(Joiner.on("\n").join(error));
    }
    return latestDocument;
  }
}
