Merge pull request #6293 [BEAM-5255] Fix over-aggressive division futurization in benchmarks.

[BEAM-5255] Fix over-aggressive division futurization in benchmarks.
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 862ba5b..9920dde 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -21,9 +21,9 @@
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
-import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
 import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertEquals;
@@ -61,6 +61,7 @@
   private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOTest.class);
 
   private static final String ES_IP = "127.0.0.1";
+  private static final int MAX_STARTUP_WAITING_TIME_MSEC = 5000;
 
   private static Node node;
   private static RestClient restClient;
@@ -97,10 +98,25 @@
     node.start();
     connectionConfiguration =
         ConnectionConfiguration.create(
-            new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE);
+            new String[] {"http://" + ES_IP + ":" + esHttpPort}, getEsIndex(), ES_TYPE);
     restClient = connectionConfiguration.createClient();
     elasticsearchIOTestCommon =
         new ElasticsearchIOTestCommon(connectionConfiguration, restClient, false);
+    int waitingTime = 0;
+    int healthCheckFrequency = 500;
+    while ((waitingTime < MAX_STARTUP_WAITING_TIME_MSEC)
+        && restClient.performRequest("HEAD", "/").getStatusLine().getStatusCode() != 200) {
+      try {
+        Thread.sleep(healthCheckFrequency);
+        waitingTime += healthCheckFrequency;
+      } catch (InterruptedException e) {
+        LOG.warn(
+            "Waiting thread was interrupted while waiting for connection to Elasticsearch to be available");
+      }
+    }
+    if (waitingTime >= MAX_STARTUP_WAITING_TIME_MSEC) {
+      throw new IOException("Max startup waiting for embedded Elasticsearch to start was exceeded");
+    }
   }
 
   @AfterClass
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index d2791c7..b453b9f 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -21,12 +21,11 @@
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
-import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
 import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
 import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertEquals;
 
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
 import java.io.IOException;
@@ -100,7 +99,8 @@
   @Before
   public void setup() {
     if (connectionConfiguration == null) {
-      connectionConfiguration = ConnectionConfiguration.create(fillAddresses(), ES_INDEX, ES_TYPE);
+      connectionConfiguration =
+          ConnectionConfiguration.create(fillAddresses(), getEsIndex(), ES_TYPE);
       elasticsearchIOTestCommon =
           new ElasticsearchIOTestCommon(connectionConfiguration, getRestClient(), false);
     }
@@ -112,7 +112,7 @@
   public void testSizes() throws Exception {
     // need to create the index using the helper method (not create it at first insertion)
     // for the indexSettings() to be run
-    createIndex(ES_INDEX);
+    createIndex(getEsIndex());
     elasticsearchIOTestCommon.testSizes();
   }
 
@@ -120,7 +120,7 @@
   public void testRead() throws Exception {
     // need to create the index using the helper method (not create it at first insertion)
     // for the indexSettings() to be run
-    createIndex(ES_INDEX);
+    createIndex(getEsIndex());
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testRead();
   }
@@ -129,7 +129,7 @@
   public void testReadWithQuery() throws Exception {
     // need to create the index using the helper method (not create it at first insertion)
     // for the indexSettings() to be run
-    createIndex(ES_INDEX);
+    createIndex(getEsIndex());
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testReadWithQuery();
   }
@@ -162,7 +162,7 @@
   public void testSplit() throws Exception {
     //need to create the index using the helper method (not create it at first insertion)
     // for the indexSettings() to be run
-    createIndex(ES_INDEX);
+    createIndex(getEsIndex());
     ElasticSearchIOTestUtils.insertTestDocuments(
         connectionConfiguration, NUM_DOCS_UTESTS, getRestClient());
     PipelineOptions options = PipelineOptionsFactory.create();
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
index 4b95ea0..6867a95 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -60,8 +60,13 @@
     deleteIndex(restClient, connectionConfiguration.getIndex());
   }
 
+  private static void closeIndex(RestClient restClient, String index) throws IOException {
+    restClient.performRequest("POST", String.format("/%s/_close", index));
+  }
+
   private static void deleteIndex(RestClient restClient, String index) throws IOException {
     try {
+      closeIndex(restClient, index);
       restClient.performRequest("DELETE", String.format("/%s", index));
     } catch (IOException e) {
       // it is fine to ignore this expression as deleteIndex occurs in @before,
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
index 6598e97..6ef38bd 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
@@ -18,9 +18,9 @@
 package org.apache.beam.sdk.io.elasticsearch;
 
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
-import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_ITESTS;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
 
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -54,9 +54,9 @@
 
   /** Enum encapsulating the mode of operation and the index. */
   enum IndexMode {
-    READ(ES_INDEX),
-    WRITE(ES_INDEX + System.currentTimeMillis()),
-    WRITE_PARTIAL(ES_INDEX + "_partial_" + System.currentTimeMillis());
+    READ(getEsIndex()),
+    WRITE(getEsIndex() + System.currentTimeMillis()),
+    WRITE_PARTIAL(getEsIndex() + "_partial_" + System.currentTimeMillis());
 
     private final String index;
 
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index 57b450d..5a8ad78 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -82,7 +82,10 @@
       "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : \"1\" } }\n"
           + "{ \"field1\" : @ }\n";
 
-  static final String ES_INDEX = "beam";
+  static String getEsIndex() {
+    return "beam" + Thread.currentThread().getId();
+  }
+
   static final String ES_TYPE = "test";
   static final long NUM_DOCS_UTESTS = 400L;
   static final long NUM_DOCS_ITESTS = 50000L;