[FLINK-17159] Harden ElasticsearchSinkITCase

Before, it could happen that the embedded node is not ready. Now we wait
for nodes/data nodes to be live before returning from the initialization
method.
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
index 6f185d3..68e0fe8 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
@@ -21,12 +21,21 @@
 import org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironment;
 import org.apache.flink.util.InstantiationUtil;
 
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.client.ClusterAdminClient;
+import org.elasticsearch.common.unit.TimeValue;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
 /**
  * A resource that starts an embedded elasticsearch cluster.
  */
@@ -55,6 +64,27 @@
 
 		tempFolder.create();
 		embeddedNodeEnv.start(tempFolder.newFolder(), clusterName);
+
+		waitForCluster();
+	}
+
+	/**
+	 * Blocks until the cluster is ready and data nodes/nodes are live.
+	 */
+	private void waitForCluster() {
+		AdminClient adminClient = embeddedNodeEnv.getClient().admin();
+		ClusterAdminClient clusterAdminClient = adminClient.cluster();
+
+		ClusterHealthRequestBuilder requestBuilder = clusterAdminClient.prepareHealth("_all");
+		requestBuilder = requestBuilder.setTimeout(TimeValue.timeValueSeconds(120));
+
+		ActionFuture<ClusterHealthResponse> healthFuture =
+				clusterAdminClient.health(requestBuilder.request());
+
+		ClusterHealthResponse health = healthFuture.actionGet(TimeValue.timeValueSeconds(120));
+
+		assertThat(health.getNumberOfNodes(), greaterThanOrEqualTo(1));
+		assertThat(health.getNumberOfDataNodes(), greaterThanOrEqualTo(1));
 	}
 
 	@Override