[FLINK-17159] Harden ElasticsearchSinkITCase
Before, it could happen that the embedded node is not ready. Now we wait
for nodes/data nodes to be live before returning from the initialization
method.
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
index 6f185d3..68e0fe8 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
@@ -21,12 +21,21 @@
import org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironment;
import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
+import org.elasticsearch.client.ClusterAdminClient;
+import org.elasticsearch.common.unit.TimeValue;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
/**
* A resource that starts an embedded elasticsearch cluster.
*/
@@ -55,6 +64,27 @@
tempFolder.create();
embeddedNodeEnv.start(tempFolder.newFolder(), clusterName);
+
+ waitForCluster();
+ }
+
+ /**
+ * Blocks until the cluster is ready and data nodes/nodes are live.
+ */
+ private void waitForCluster() {
+ AdminClient adminClient = embeddedNodeEnv.getClient().admin();
+ ClusterAdminClient clusterAdminClient = adminClient.cluster();
+
+ ClusterHealthRequestBuilder requestBuilder = clusterAdminClient.prepareHealth("_all");
+ requestBuilder = requestBuilder.setTimeout(TimeValue.timeValueSeconds(120));
+
+ ActionFuture<ClusterHealthResponse> healthFuture =
+ clusterAdminClient.health(requestBuilder.request());
+
+ ClusterHealthResponse health = healthFuture.actionGet(TimeValue.timeValueSeconds(120));
+
+ assertThat(health.getNumberOfNodes(), greaterThanOrEqualTo(1));
+ assertThat(health.getNumberOfDataNodes(), greaterThanOrEqualTo(1));
}
@Override