/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.metron.solr.writer;

import static org.apache.metron.solr.SolrConstants.SOLR_WRITER_NAME;

import com.google.common.base.Joiner;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.metron.common.writer.MessageId;
import org.apache.metron.solr.SolrConstants;
import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable {

  public static final String JAVA_SECURITY_CONFIG_PROPERTY = "java.security.auth.login.config";

  public enum SolrProperties {
    ZOOKEEPER_QUORUM(SolrConstants.SOLR_ZOOKEEPER),
    COMMIT_PER_BATCH("solr.commitPerBatch", Optional.of(true)),
    COMMIT_WAIT_SEARCHER("solr.commit.waitSearcher", Optional.of(true)),
    COMMIT_WAIT_FLUSH("solr.commit.waitFlush", Optional.of(true)),
    COMMIT_SOFT("solr.commit.soft", Optional.of(false)),
    DEFAULT_COLLECTION("solr.collection", Optional.of("metron")),
    HTTP_CONFIG("solr.http.config", Optional.of(new HashMap<>()))
    ;
    String name;
    Optional<Object> defaultValue;

    SolrProperties(String name) {
      this(name, Optional.empty());
    }
    SolrProperties(String name, Optional<Object> defaultValue) {
      this.name = name;
      this.defaultValue = defaultValue;
    }

    public <T> Optional<T> coerceOrDefault(Map<String, Object> globalConfig, Class<T> clazz) {
      Object val = globalConfig.get(name);
      if(val != null) {
        T ret = null;
        try {
          ret = ConversionUtils.convert(val, clazz);
        }
        catch(ClassCastException cce) {
          ret = null;
        }
        if(ret == null) {
          //unable to convert value
          LOG.warn("Unable to convert {} to {}, was {}", name, clazz.getName(), "" + val);
          if(defaultValue.isPresent()) {
            return Optional.ofNullable(ConversionUtils.convert(defaultValue.get(), clazz));
          }
          else {
            return Optional.empty();
          }
        }
        else {
          return Optional.ofNullable(ret);
        }
      }
      else {
        if(defaultValue.isPresent()) {
          return Optional.ofNullable(ConversionUtils.convert(defaultValue.get(), clazz));
        }
        else {
          return Optional.empty();
        }
      }
    }

    public Supplier<IllegalArgumentException> errorOut(Map<String, Object> globalConfig) {
      String message = "Unable to retrieve " + name + " from global config, value associated is " + globalConfig.get(name);
      return () -> new IllegalArgumentException(message);
    }

    public <T> T coerceOrDefaultOrExcept(Map<String, Object> globalConfig, Class<T> clazz) {
         return this.coerceOrDefault(globalConfig, clazz).orElseThrow(this.errorOut(globalConfig));
    }

  }


  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  private Boolean shouldCommit;
  private Boolean softCommit;
  private Boolean waitSearcher;
  private Boolean waitFlush;
  private String zookeeperUrl;
  private String defaultCollection;
  private Map<String, Object> solrHttpConfig;

  private MetronSolrClient solr;

  public SolrWriter withMetronSolrClient(MetronSolrClient solr) {
    this.solr = solr;
    return this;
  }

  public void initializeFromGlobalConfig(Map<String, Object> globalConfiguration) {
    zookeeperUrl = SolrProperties.ZOOKEEPER_QUORUM.coerceOrDefaultOrExcept(globalConfiguration, String.class);
    defaultCollection = SolrProperties.DEFAULT_COLLECTION.coerceOrDefaultOrExcept(globalConfiguration, String.class);
    solrHttpConfig = SolrProperties.HTTP_CONFIG.coerceOrDefaultOrExcept(globalConfiguration, Map.class);
    shouldCommit = SolrProperties.COMMIT_PER_BATCH.coerceOrDefaultOrExcept(globalConfiguration, Boolean.class);
    softCommit = SolrProperties.COMMIT_SOFT.coerceOrDefaultOrExcept(globalConfiguration, Boolean.class);
    waitSearcher = SolrProperties.COMMIT_WAIT_SEARCHER.coerceOrDefaultOrExcept(globalConfiguration, Boolean.class);
    waitFlush = SolrProperties.COMMIT_WAIT_FLUSH.coerceOrDefaultOrExcept(globalConfiguration, Boolean.class);
  }

  @Override
  public void init(Map stormConf, WriterConfiguration configurations) throws IOException, SolrServerException {
    Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
    initializeFromGlobalConfig(globalConfiguration);
    LOG.info("Initializing SOLR writer: {}", zookeeperUrl);
    LOG.info("Forcing commit per batch: {}", shouldCommit);
    LOG.info("Soft commit: {}", softCommit);
    LOG.info("Commit Wait Searcher: {}", waitSearcher);
    LOG.info("Commit Wait Flush: {}", waitFlush);
    LOG.info("Default Collection: {}", "" + defaultCollection );
    if(solr == null) {
      if (isKerberosEnabled(stormConf)) {
        HttpClientUtil.addConfigurer(new Krb5HttpClientConfigurer());
      }
      solr = new MetronSolrClient(zookeeperUrl, solrHttpConfig);
    }
    solr.setDefaultCollection(defaultCollection);

  }

  public Collection<SolrInputDocument> toDocs(Iterable<BulkMessage<JSONObject>> messages) {
    Collection<SolrInputDocument> ret = new ArrayList<>();
    for(BulkMessage<JSONObject> bulkWriterMessage: messages) {
      SolrInputDocument document = new SolrInputDocument();
      JSONObject message = bulkWriterMessage.getMessage();
      for (Object key : message.keySet()) {
        Object value = message.get(key);
        if (value instanceof Iterable) {
          for (Object v : (Iterable) value) {
            document.addField("" + key, v);
          }
        } else {
          document.addField("" + key, value);
        }
      }
      if (!document.containsKey(Constants.GUID)) {
        document.addField(Constants.GUID, UUID.randomUUID().toString());
      }
      ret.add(document);
    }
    return ret;
  }

  protected String getCollection(String sourceType, WriterConfiguration configurations) {
    String collection = configurations.getIndex(sourceType);
    if(StringUtils.isEmpty(collection)) {
      return solr.getDefaultCollection();
    }
    return collection;
  }

  @Override
  public BulkWriterResponse write(String sourceType, WriterConfiguration configurations, List<BulkMessage<JSONObject>> messages) throws Exception {
    String collection = getCollection(sourceType, configurations);
    BulkWriterResponse bulkResponse = new BulkWriterResponse();
    Collection<SolrInputDocument> docs = toDocs(messages);
    Set<MessageId> ids = messages.stream().map(BulkMessage::getId).collect(Collectors.toSet());
    try {
      Optional<SolrException> exceptionOptional = fromUpdateResponse(solr.add(collection, docs));
      // Solr commits the entire batch or throws an exception for it.  There's no way to get partial failures.
      if(exceptionOptional.isPresent()) {
        bulkResponse.addAllErrors(exceptionOptional.get(), ids);
      }
      else {
        if (shouldCommit) {
          exceptionOptional = fromUpdateResponse(solr.commit(collection, waitFlush, waitSearcher, softCommit));
          if(exceptionOptional.isPresent()) {
            bulkResponse.addAllErrors(exceptionOptional.get(), ids);
          }
        }
        if(!exceptionOptional.isPresent()) {
          bulkResponse.addAllSuccesses(ids);
        }
      }
    }
    catch(HttpSolrClient.RemoteSolrException sse) {
      bulkResponse.addAllErrors(sse, ids);
    }

    return bulkResponse;
  }

  protected Optional<SolrException> fromUpdateResponse(UpdateResponse response) {
    if(response != null && response.getStatus() > 0) {
      String message = "Solr Update response: " + Joiner.on(",").join(response.getResponse());
      return Optional.of(new SolrException(SolrException.ErrorCode.BAD_REQUEST, message));
    }
    return Optional.empty();
  }

  @Override
  public String getName() {
    return SOLR_WRITER_NAME;
  }

  @Override
  public void close() throws Exception {
    if(solr != null) {
      solr.close();
    }
  }

  private boolean isKerberosEnabled(Map stormConfig) {
    if (stormConfig == null) {
      return false;
    }
    String value = (String) stormConfig.get(JAVA_SECURITY_CONFIG_PROPERTY);
    return value != null && !value.isEmpty();
  }
}
