/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.io.elasticsearch;

import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.UPDATE_INDEX;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.UPDATE_TYPE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;

import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.testing.TestPipeline;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Tests for {@link ElasticsearchIO} version 2.x. */
@RunWith(JUnit4.class)
public class ElasticsearchIOTest implements Serializable {

  private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOTest.class);

  private static final String ES_IP = "127.0.0.1";
  private static final int MAX_STARTUP_WAITING_TIME_MSEC = 5000;
  private static int esHttpPort;
  private static Node node;
  private static RestClient restClient;
  private static ConnectionConfiguration connectionConfiguration;
  // cannot use inheritance because ES5 test already extends ESIntegTestCase.
  private static ElasticsearchIOTestCommon elasticsearchIOTestCommon;

  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

  @Rule public TestPipeline pipeline = TestPipeline.create();

  @BeforeClass
  public static void beforeClass() throws IOException {
    esHttpPort = NetworkTestHelper.getAvailableLocalPort();
    LOG.info("Starting embedded Elasticsearch instance ({})", esHttpPort);
    Settings.Builder settingsBuilder =
        Settings.settingsBuilder()
            .put("cluster.name", "beam")
            .put("http.enabled", "true")
            .put("node.data", "true")
            .put("path.data", TEMPORARY_FOLDER.getRoot().getPath())
            .put("path.home", TEMPORARY_FOLDER.getRoot().getPath())
            .put("node.name", "beam")
            .put("network.host", ES_IP)
            .put("http.port", esHttpPort)
            .put("index.store.stats_refresh_interval", 0)
            // had problems with some jdk, embedded ES was too slow for bulk insertion,
            // and queue of 50 was full. No pb with real ES instance (cf testWrite integration test)
            .put("threadpool.bulk.queue_size", 400);
    node = new Node(settingsBuilder.build());
    LOG.info("Elasticsearch node created");
    node.start();
    connectionConfiguration =
        ConnectionConfiguration.create(
                new String[] {"http://" + ES_IP + ":" + esHttpPort}, getEsIndex(), ES_TYPE)
            .withSocketAndRetryTimeout(120000)
            .withConnectTimeout(5000);
    restClient = connectionConfiguration.createClient();
    elasticsearchIOTestCommon =
        new ElasticsearchIOTestCommon(connectionConfiguration, restClient, false);
    int waitingTime = 0;
    int healthCheckFrequency = 500;
    while ((waitingTime < MAX_STARTUP_WAITING_TIME_MSEC)
        && restClient.performRequest("HEAD", "/").getStatusLine().getStatusCode() != 200) {
      try {
        Thread.sleep(healthCheckFrequency);
        waitingTime += healthCheckFrequency;
      } catch (InterruptedException e) {
        LOG.warn(
            "Waiting thread was interrupted while waiting for connection to Elasticsearch to be available");
      }
    }
    if (waitingTime >= MAX_STARTUP_WAITING_TIME_MSEC) {
      throw new IOException("Max startup waiting for embedded Elasticsearch to start was exceeded");
    }
  }

  @AfterClass
  public static void afterClass() throws IOException {
    restClient.close();
    node.close();
  }

  @Before
  public void before() throws Exception {
    ElasticSearchIOTestUtils.deleteIndex(connectionConfiguration, restClient);
  }

  @Test
  public void testSizes() throws Exception {
    elasticsearchIOTestCommon.testSizes();
  }

  @Test
  public void testRead() throws Exception {
    elasticsearchIOTestCommon.setPipeline(pipeline);
    elasticsearchIOTestCommon.testRead();
  }

  @Test
  public void testReadWithQuery() throws Exception {
    elasticsearchIOTestCommon.setPipeline(pipeline);
    elasticsearchIOTestCommon.testReadWithQuery();
  }

  @Test
  public void testWrite() throws Exception {
    elasticsearchIOTestCommon.setPipeline(pipeline);
    elasticsearchIOTestCommon.testWrite();
  }

  @Rule public ExpectedException expectedException = ExpectedException.none();

  @Test
  public void testWriteWithErrors() throws Exception {
    elasticsearchIOTestCommon.setExpectedException(expectedException);
    elasticsearchIOTestCommon.testWriteWithErrors();
  }

  @Test
  public void testWriteWithMaxBatchSize() throws Exception {
    elasticsearchIOTestCommon.testWriteWithMaxBatchSize();
  }

  @Test
  public void testWriteWithMaxBatchSizeBytes() throws Exception {
    elasticsearchIOTestCommon.testWriteWithMaxBatchSizeBytes();
  }

  @Test
  public void testSplit() throws Exception {
    elasticsearchIOTestCommon.testSplit(0);
  }

  @Test
  public void testWriteWithIdFn() throws Exception {
    elasticsearchIOTestCommon.setPipeline(pipeline);
    elasticsearchIOTestCommon.testWriteWithIdFn();
  }

  @Test
  public void testWriteWithIndexFn() throws Exception {
    elasticsearchIOTestCommon.setPipeline(pipeline);
    elasticsearchIOTestCommon.testWriteWithIndexFn();
  }

  @Test
  public void testWriteWithTypeFn() throws Exception {
    elasticsearchIOTestCommon.setPipeline(pipeline);
    elasticsearchIOTestCommon.testWriteWithTypeFn2x5x();
  }

  @Test
  public void testWriteFullAddressing() throws Exception {
    elasticsearchIOTestCommon.setPipeline(pipeline);
    elasticsearchIOTestCommon.testWriteWithFullAddressing();
  }

  @Test
  public void testWritePartialUpdate() throws Exception {
    elasticsearchIOTestCommon.setPipeline(pipeline);
    elasticsearchIOTestCommon.testWritePartialUpdate();
  }

  @Test
  public void testWritePartialUpdateWithErrors() throws Exception {
    // cannot share elasticsearchIOTestCommon because tests run in parallel.
    ConnectionConfiguration connectionConfiguration =
        ConnectionConfiguration.create(
            new String[] {"http://" + ES_IP + ":" + esHttpPort}, UPDATE_INDEX, UPDATE_TYPE);
    ElasticsearchIOTestCommon elasticsearchIOTestCommonWithErrors =
        new ElasticsearchIOTestCommon(connectionConfiguration, restClient, false);
    elasticsearchIOTestCommonWithErrors.setPipeline(pipeline);
    elasticsearchIOTestCommonWithErrors.testWritePartialUpdateWithErrors();
  }

  @Test
  public void testReadWithMetadata() throws Exception {
    elasticsearchIOTestCommon.setPipeline(pipeline);
    elasticsearchIOTestCommon.testReadWithMetadata();
  }

  @Test
  public void testDefaultRetryPredicate() throws IOException {
    elasticsearchIOTestCommon.testDefaultRetryPredicate(restClient);
  }

  @Test
  public void testWriteRetry() throws Throwable {
    elasticsearchIOTestCommon.setExpectedException(expectedException);
    elasticsearchIOTestCommon.setPipeline(pipeline);
    elasticsearchIOTestCommon.testWriteRetry();
  }

  @Test
  public void testWriteRetryValidRequest() throws Exception {
    elasticsearchIOTestCommon.setPipeline(pipeline);
    elasticsearchIOTestCommon.testWriteRetryValidRequest();
  }
}
