Merge pull request #6293 [BEAM-5255] Fix over-aggressive division futurization in benchmarks.
[BEAM-5255] Fix over-aggressive division futurization in benchmarks.
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 862ba5b..9920dde 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -21,9 +21,9 @@
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
-import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
@@ -61,6 +61,7 @@
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOTest.class);
private static final String ES_IP = "127.0.0.1";
+ private static final int MAX_STARTUP_WAITING_TIME_MSEC = 5000;
private static Node node;
private static RestClient restClient;
@@ -97,10 +98,25 @@
node.start();
connectionConfiguration =
ConnectionConfiguration.create(
- new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE);
+ new String[] {"http://" + ES_IP + ":" + esHttpPort}, getEsIndex(), ES_TYPE);
restClient = connectionConfiguration.createClient();
elasticsearchIOTestCommon =
new ElasticsearchIOTestCommon(connectionConfiguration, restClient, false);
+ int waitingTime = 0;
+ int healthCheckFrequency = 500;
+ while ((waitingTime < MAX_STARTUP_WAITING_TIME_MSEC)
+ && restClient.performRequest("HEAD", "/").getStatusLine().getStatusCode() != 200) {
+ try {
+ Thread.sleep(healthCheckFrequency);
+ waitingTime += healthCheckFrequency;
+ } catch (InterruptedException e) {
+ LOG.warn(
+ "Waiting thread was interrupted while waiting for connection to Elasticsearch to be available");
+ }
+ }
+ if (waitingTime >= MAX_STARTUP_WAITING_TIME_MSEC) {
+ throw new IOException("Max startup waiting for embedded Elasticsearch to start was exceeded");
+ }
}
@AfterClass
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index d2791c7..b453b9f 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -21,12 +21,11 @@
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
-import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertEquals;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import java.io.IOException;
@@ -100,7 +99,8 @@
@Before
public void setup() {
if (connectionConfiguration == null) {
- connectionConfiguration = ConnectionConfiguration.create(fillAddresses(), ES_INDEX, ES_TYPE);
+ connectionConfiguration =
+ ConnectionConfiguration.create(fillAddresses(), getEsIndex(), ES_TYPE);
elasticsearchIOTestCommon =
new ElasticsearchIOTestCommon(connectionConfiguration, getRestClient(), false);
}
@@ -112,7 +112,7 @@
public void testSizes() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
// for the indexSettings() to be run
- createIndex(ES_INDEX);
+ createIndex(getEsIndex());
elasticsearchIOTestCommon.testSizes();
}
@@ -120,7 +120,7 @@
public void testRead() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
// for the indexSettings() to be run
- createIndex(ES_INDEX);
+ createIndex(getEsIndex());
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testRead();
}
@@ -129,7 +129,7 @@
public void testReadWithQuery() throws Exception {
// need to create the index using the helper method (not create it at first insertion)
// for the indexSettings() to be run
- createIndex(ES_INDEX);
+ createIndex(getEsIndex());
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testReadWithQuery();
}
@@ -162,7 +162,7 @@
public void testSplit() throws Exception {
//need to create the index using the helper method (not create it at first insertion)
// for the indexSettings() to be run
- createIndex(ES_INDEX);
+ createIndex(getEsIndex());
ElasticSearchIOTestUtils.insertTestDocuments(
connectionConfiguration, NUM_DOCS_UTESTS, getRestClient());
PipelineOptions options = PipelineOptionsFactory.create();
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
index 4b95ea0..6867a95 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -60,8 +60,13 @@
deleteIndex(restClient, connectionConfiguration.getIndex());
}
+ private static void closeIndex(RestClient restClient, String index) throws IOException {
+ restClient.performRequest("POST", String.format("/%s/_close", index));
+ }
+
private static void deleteIndex(RestClient restClient, String index) throws IOException {
try {
+ closeIndex(restClient, index);
restClient.performRequest("DELETE", String.format("/%s", index));
} catch (IOException e) {
// it is fine to ignore this expression as deleteIndex occurs in @before,
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
index 6598e97..6ef38bd 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
@@ -18,9 +18,9 @@
package org.apache.beam.sdk.io.elasticsearch;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
-import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_ITESTS;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -54,9 +54,9 @@
/** Enum encapsulating the mode of operation and the index. */
enum IndexMode {
- READ(ES_INDEX),
- WRITE(ES_INDEX + System.currentTimeMillis()),
- WRITE_PARTIAL(ES_INDEX + "_partial_" + System.currentTimeMillis());
+ READ(getEsIndex()),
+ WRITE(getEsIndex() + System.currentTimeMillis()),
+ WRITE_PARTIAL(getEsIndex() + "_partial_" + System.currentTimeMillis());
private final String index;
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index 57b450d..5a8ad78 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -82,7 +82,10 @@
"{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : \"1\" } }\n"
+ "{ \"field1\" : @ }\n";
- static final String ES_INDEX = "beam";
+ static String getEsIndex() {
+ return "beam" + Thread.currentThread().getId();
+ }
+
static final String ES_TYPE = "test";
static final long NUM_DOCS_UTESTS = 400L;
static final long NUM_DOCS_ITESTS = 50000L;