/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.io.elasticsearch;

import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;

/** Test utilities to use with {@link ElasticsearchIO}. */
class ElasticsearchIOTestUtils {
  static final String[] FAMOUS_SCIENTISTS = {
    "Einstein",
    "Darwin",
    "Copernicus",
    "Pasteur",
    "Curie",
    "Faraday",
    "Newton",
    "Bohr",
    "Galilei",
    "Maxwell"
  };
  static final int NUM_SCIENTISTS = FAMOUS_SCIENTISTS.length;

  /** Enumeration that specifies whether to insert malformed documents. */
  public enum InjectionMode {
    INJECT_SOME_INVALID_DOCS,
    DO_NOT_INJECT_INVALID_DOCS
  }

  /** Deletes the given index synchronously. */
  static void deleteIndex(ConnectionConfiguration connectionConfiguration, RestClient restClient)
      throws IOException {
    deleteIndex(restClient, connectionConfiguration.getIndex());
  }

  private static void closeIndex(RestClient restClient, String index) throws IOException {
    restClient.performRequest("POST", String.format("/%s/_close", index));
  }

  private static void deleteIndex(RestClient restClient, String index) throws IOException {
    try {
      closeIndex(restClient, index);
      restClient.performRequest(
          "DELETE", String.format("/%s", index), Collections.singletonMap("refresh", "wait_for"));
    } catch (IOException e) {
      // it is fine to ignore this expression as deleteIndex occurs in @before,
      // so when the first tests is run, the index does not exist yet
      if (!e.getMessage().contains("index_not_found_exception")) {
        throw e;
      }
    }
  }

  /**
   * Synchronously deletes the target if it exists and then (re)creates it as a copy of source
   * synchronously.
   */
  static void copyIndex(RestClient restClient, String source, String target) throws IOException {
    deleteIndex(restClient, target);
    HttpEntity entity =
        new NStringEntity(
            String.format(
                "{\"source\" : { \"index\" : \"%s\" }, \"dest\" : { \"index\" : \"%s\" } }",
                source, target),
            ContentType.APPLICATION_JSON);
    restClient.performRequest(
        "POST", "/_reindex", Collections.singletonMap("refresh", "wait_for"), entity);
  }

  /** Inserts the given number of test documents into Elasticsearch. */
  static void insertTestDocuments(
      ConnectionConfiguration connectionConfiguration, long numDocs, RestClient restClient)
      throws IOException {
    List<String> data =
        ElasticsearchIOTestUtils.createDocuments(
            numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
    StringBuilder bulkRequest = new StringBuilder();
    int i = 0;
    for (String document : data) {
      bulkRequest.append(
          String.format(
              "{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }%n%s%n",
              connectionConfiguration.getIndex(),
              connectionConfiguration.getType(),
              i++,
              document));
    }
    String endPoint =
        String.format(
            "/%s/%s/_bulk", connectionConfiguration.getIndex(), connectionConfiguration.getType());
    HttpEntity requestBody =
        new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
    Response response =
        restClient.performRequest(
            "POST", endPoint, Collections.singletonMap("refresh", "wait_for"), requestBody);
    ElasticsearchIO.checkForErrors(
        response.getEntity(), ElasticsearchIO.getBackendVersion(connectionConfiguration), false);
  }
  /**
   * Forces a refresh of the given index to make recently inserted documents available for search
   * using the index and type named in the connectionConfiguration.
   *
   * @param connectionConfiguration providing the index and type
   * @param restClient To use for issuing queries
   * @return The number of docs in the index
   * @throws IOException On error communicating with Elasticsearch
   */
  static long refreshIndexAndGetCurrentNumDocs(
      ConnectionConfiguration connectionConfiguration, RestClient restClient) throws IOException {
    return refreshIndexAndGetCurrentNumDocs(
        restClient, connectionConfiguration.getIndex(), connectionConfiguration.getType());
  }

  /**
   * Forces a refresh of the given index to make recently inserted documents available for search.
   *
   * @param restClient To use for issuing queries
   * @param index The Elasticsearch index
   * @param type The Elasticsearch type
   * @return The number of docs in the index
   * @throws IOException On error communicating with Elasticsearch
   */
  static long refreshIndexAndGetCurrentNumDocs(RestClient restClient, String index, String type)
      throws IOException {
    long result = 0;
    try {
      String endPoint = String.format("/%s/_refresh", index);
      restClient.performRequest("POST", endPoint);

      endPoint = String.format("/%s/%s/_search", index, type);
      Response response = restClient.performRequest("GET", endPoint);
      JsonNode searchResult = ElasticsearchIO.parseResponse(response.getEntity());
      result = searchResult.path("hits").path("total").asLong();
    } catch (IOException e) {
      // it is fine to ignore bellow exceptions because in testWriteWithBatchSize* sometimes,
      // we call upgrade before any doc have been written
      // (when there are fewer docs processed than batchSize).
      // In that cases index/type has not been created (created upon first doc insertion)
      if (!e.getMessage().contains("index_not_found_exception")) {
        throw e;
      }
    }
    return result;
  }

  /**
   * Generates a list of test documents for insertion.
   *
   * @param numDocs Number of docs to generate
   * @param injectionMode {@link InjectionMode} that specifies whether to insert malformed documents
   * @return the list of json String representing the documents
   */
  static List<String> createDocuments(long numDocs, InjectionMode injectionMode) {

    ArrayList<String> data = new ArrayList<>();
    for (int i = 0; i < numDocs; i++) {
      int index = i % FAMOUS_SCIENTISTS.length;
      // insert 2 malformed documents
      if (InjectionMode.INJECT_SOME_INVALID_DOCS.equals(injectionMode) && (i == 6 || i == 7)) {
        data.add(String.format("{\"scientist\";\"%s\", \"id\":%s}", FAMOUS_SCIENTISTS[index], i));
      } else {
        data.add(String.format("{\"scientist\":\"%s\", \"id\":%s}", FAMOUS_SCIENTISTS[index], i));
      }
    }
    return data;
  }

  /**
   * Executes a query for the named scientist and returns the count from the result.
   *
   * @param connectionConfiguration Specifies the index and type
   * @param restClient To use to execute the call
   * @param scientistName The scientist to query for
   * @return The cound of documents found
   * @throws IOException On error talking to Elasticsearch
   */
  static int countByScientistName(
      ConnectionConfiguration connectionConfiguration, RestClient restClient, String scientistName)
      throws IOException {
    return countByMatch(connectionConfiguration, restClient, "scientist", scientistName);
  }

  /**
   * Executes a match query for given field/value and returns the count of results.
   *
   * @param connectionConfiguration Specifies the index and type
   * @param restClient To use to execute the call
   * @param field The field to query
   * @param value The value to match
   * @return The count of documents in the search result
   * @throws IOException On error communicating with Elasticsearch
   */
  static int countByMatch(
      ConnectionConfiguration connectionConfiguration,
      RestClient restClient,
      String field,
      String value)
      throws IOException {
    String requestBody =
        "{\n"
            + "  \"query\" : {\"match\": {\n"
            + "    \""
            + field
            + "\": \""
            + value
            + "\"\n"
            + "  }}\n"
            + "}\n";
    String endPoint =
        String.format(
            "/%s/%s/_search",
            connectionConfiguration.getIndex(), connectionConfiguration.getType());
    HttpEntity httpEntity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
    Response response =
        restClient.performRequest("GET", endPoint, Collections.emptyMap(), httpEntity);
    JsonNode searchResult = parseResponse(response.getEntity());
    return searchResult.path("hits").path("total").asInt();
  }

  public static void setIndexMapping(
      ConnectionConfiguration connectionConfiguration, RestClient restClient) throws IOException {
    String endpoint = String.format("/%s", connectionConfiguration.getIndex());
    String requestString =
        String.format(
            "{\"mappings\":{\"%s\":{\"properties\":{\"age\":{\"type\":\"long\"},"
                + " \"scientist\":{\"type\":\"%s\"}, \"id\":{\"type\":\"long\"}}}}}",
            connectionConfiguration.getType(),
            getBackendVersion(connectionConfiguration) == 2 ? "string" : "text");
    HttpEntity requestBody = new NStringEntity(requestString, ContentType.APPLICATION_JSON);
    Request request = new Request("PUT", endpoint);
    request.setEntity(requestBody);
    restClient.performRequest(request);
  }
}
