Upgraded ElasticSearch to get rid of CVEs (and switched client to OpenSearch one) (#13867)
* Upgraded ElasticSearch to get rid of CVEs. (#13747)
* Upgraded ElasticSearch to get rid of CVEs.
CVE-2020-7020
CVE-2020-7021
CVE-2021-22132
CVE-2021-22134
CVE-2021-22144
CVE-2021-22147
* Elastic search client version >= 7.11 no longer works with OSS Elastic images (and elastic.co no longer releases OSS images)
* Fixed tests for Elasticsearch
* pom cleanup
* Switched to OpenSearch client for Elastic (Apache 2 licensed)
diff --git a/pom.xml b/pom.xml
index 4444201..d91cbfa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,7 +153,7 @@
<mariadb-jdbc.version>2.6.0</mariadb-jdbc.version>
<hdfs-offload-version3>3.3.1</hdfs-offload-version3>
<json-smart.version>2.4.7</json-smart.version>
- <elasticsearch.version>7.9.1</elasticsearch.version>
+ <opensearch.version>1.2.4</opensearch.version>
<presto.version>332</presto.version>
<scala.binary.version>2.13</scala.binary.version>
<scala-library.version>2.13.6</scala-library.version>
@@ -1116,9 +1116,9 @@
</dependency>
<dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>${elasticsearch.version}</version>
+ <groupId>org.opensearch.client</groupId>
+ <artifactId>opensearch-rest-high-level-client</artifactId>
+ <version>${opensearch.version}</version>
</dependency>
<dependency>
@@ -1328,6 +1328,7 @@
</dependencies>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<encoding>UTF-8</encoding>
@@ -1845,6 +1846,7 @@
<pluginManagement>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<!-- for some reason, setting maven.compiler.release property alone doesn't work -->
diff --git a/pulsar-io/elastic-search/pom.xml b/pulsar-io/elastic-search/pom.xml
index a4fcc5a..c13c6ac 100644
--- a/pulsar-io/elastic-search/pom.xml
+++ b/pulsar-io/elastic-search/pom.xml
@@ -28,6 +28,14 @@
<artifactId>pulsar-io-elastic-search</artifactId>
<name>Pulsar IO :: ElasticSearch</name>
+ <properties>
+ <!--
+ Work-around for "Container exited with code 137" (OOM)
+ -->
+ <testReuseFork>false</testReuseFork>
+ <testForkCount>1</testForkCount>
+ </properties>
+
<dependencies>
<dependency>
@@ -76,14 +84,18 @@
</dependency>
<dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
+ <groupId>org.opensearch.client</groupId>
+ <artifactId>opensearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
- <version>1.15.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>toxiproxy</artifactId>
<scope>test</scope>
</dependency>
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index 2852532..78b8c86 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -72,35 +72,37 @@
import org.apache.http.ssl.SSLContexts;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.client.Node;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.client.indices.CreateIndexResponse;
-import org.elasticsearch.client.indices.GetIndexRequest;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.DocWriteResponse;
+import org.opensearch.action.admin.indices.create.CreateIndexRequest;
+import org.opensearch.action.admin.indices.create.CreateIndexResponse;
+import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.opensearch.action.admin.indices.refresh.RefreshRequest;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkProcessor;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.delete.DeleteResponse;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.action.search.SearchRequest;
+import org.opensearch.action.search.SearchResponse;
+import org.opensearch.action.support.master.AcknowledgedResponse;
+import org.opensearch.client.Node;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.Requests;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.client.indices.GetIndexRequest;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.unit.ByteSizeUnit;
+import org.opensearch.common.unit.ByteSizeValue;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.xcontent.XContentType;
+import org.opensearch.index.query.QueryBuilders;
+import org.opensearch.search.builder.SearchSourceBuilder;
@Slf4j
public class ElasticSearchClient implements AutoCloseable {
@@ -464,7 +466,7 @@
}
@VisibleForTesting
- protected org.elasticsearch.action.search.SearchResponse search(String indexName) throws IOException {
+ protected SearchResponse search(String indexName) throws IOException {
client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
return client.search(
new SearchRequest()
@@ -474,7 +476,7 @@
}
@VisibleForTesting
- protected org.elasticsearch.action.support.master.AcknowledgedResponse delete(String indexName) throws IOException {
+ protected AcknowledgedResponse delete(String indexName) throws IOException {
return client.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
}
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialBackoffPolicy.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialBackoffPolicy.java
index 1538775..1f71eb2 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialBackoffPolicy.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialBackoffPolicy.java
@@ -20,8 +20,8 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.common.unit.TimeValue;
+import org.opensearch.action.bulk.BackoffPolicy;
+import org.opensearch.common.unit.TimeValue;
public class RandomExponentialBackoffPolicy extends BackoffPolicy {
private final RandomExponentialRetry randomExponentialRetry;
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
index 43fffab..2070a46 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
@@ -34,7 +34,7 @@
public class ElasticSearchClientSslTests {
public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
- .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.10.2-amd64");
+ .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
final static String INDEX = "myindex";
@@ -44,9 +44,8 @@
@Test
public void testSslBasic() throws IOException {
try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
- .withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
.withFileSystemBind(sslResourceDir, configDir + "/ssl")
- .withEnv("ELASTIC_PASSWORD","elastic") // boostrap password
+ .withPassword("elastic")
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.http.ssl.enabled", "true")
@@ -80,9 +79,8 @@
@Test
public void testSslWithHostnameVerification() throws IOException {
try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
- .withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
.withFileSystemBind(sslResourceDir, configDir + "/ssl")
- .withEnv("ELASTIC_PASSWORD","elastic") // boostrap password
+ .withPassword("elastic")
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.http.ssl.enabled", "true")
@@ -119,9 +117,8 @@
@Test
public void testSslWithClientAuth() throws IOException {
try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
- .withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
.withFileSystemBind(sslResourceDir, configDir + "/ssl")
- .withEnv("ELASTIC_PASSWORD","elastic") // boostrap password
+ .withPassword("elastic")
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.http.ssl.enabled", "true")
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index 223bbfe..41bbe22 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -18,16 +18,18 @@
*/
package org.apache.pulsar.io.elasticsearch;
+import eu.rekawek.toxiproxy.model.ToxicDirection;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.elasticsearch.testcontainers.ChaosContainer;
+import org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer;
import org.awaitility.Awaitility;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
import org.junit.AfterClass;
import org.mockito.Mockito;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexRequest;
+import org.testcontainers.containers.Network;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -45,21 +47,22 @@
@Slf4j
public class ElasticSearchClientTests {
-
public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
- .orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64");
+ .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
static ElasticsearchContainer container;
+ static Network network = Network.newNetwork();
@BeforeClass
public static final void initBeforeClass() throws IOException {
- container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE);
+ container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE).withNetwork(network);
container.start();
}
@AfterClass
public static void closeAfterClass() {
container.close();
+ network.close();
}
static class MockRecord<T> implements Record<T> {
@@ -222,101 +225,111 @@
@Test
public void testBulkRetry() throws Exception {
- final String index = "indexbulktest-" + UUID.randomUUID();
- ElasticSearchConfig config = new ElasticSearchConfig()
- .setElasticSearchUrl("http://"+container.getHttpHostAddress())
- .setIndexName(index)
- .setBulkEnabled(true)
- .setMaxRetries(1000)
- .setBulkActions(2)
- .setRetryBackoffInMs(100)
- // disabled, we want to have full control over flush() method
- .setBulkFlushIntervalInMs(-1);
+ try (ElasticToxiproxiContainer toxiproxy = new ElasticToxiproxiContainer(container, network)) {
+ toxiproxy.start();
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
- try {
- assertTrue(client.createIndexIfNeeded(index));
- MockRecord<GenericObject> mockRecord = new MockRecord<>();
- client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
- client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
- assertEquals(mockRecord.acked, 2);
- assertEquals(mockRecord.failed, 0);
- assertEquals(client.totalHits(index), 2);
+ final String index = "indexbulktest-" + UUID.randomUUID();
+ ElasticSearchConfig config = new ElasticSearchConfig()
+ .setElasticSearchUrl("http://" + toxiproxy.getHttpHostAddress())
+ .setIndexName(index)
+ .setBulkEnabled(true)
+ .setMaxRetries(1000)
+ .setBulkActions(2)
+ .setRetryBackoffInMs(100)
+ // disabled, we want to have full control over flush() method
+ .setBulkFlushIntervalInMs(-1);
- ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 15);
- chaosContainer.start();
+ try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ try {
+ assertTrue(client.createIndexIfNeeded(index));
+ MockRecord<GenericObject> mockRecord = new MockRecord<>();
+ client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
+ client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
+ assertEquals(mockRecord.acked, 2);
+ assertEquals(mockRecord.failed, 0);
+ assertEquals(client.totalHits(index), 2);
- client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
- assertEquals(mockRecord.acked, 2);
- assertEquals(mockRecord.failed, 0);
- assertEquals(client.totalHits(index), 2);
+ log.info("starting the toxic");
+ toxiproxy.getProxy().setConnectionCut(false);
+ toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 15000);
+ toxiproxy.removeToxicAfterDelay("elasticpause", 15000);
- chaosContainer.stop();
- client.flush();
- assertEquals(mockRecord.acked, 3);
- assertEquals(mockRecord.failed, 0);
- assertEquals(client.totalHits(index), 3);
- } finally {
- client.delete(index);
+ client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
+ assertEquals(mockRecord.acked, 2);
+ assertEquals(mockRecord.failed, 0);
+ assertEquals(client.totalHits(index), 2);
+
+ client.flush();
+ assertEquals(mockRecord.acked, 3);
+ assertEquals(mockRecord.failed, 0);
+ assertEquals(client.totalHits(index), 3);
+ } finally {
+ client.delete(index);
+ }
}
}
}
@Test
public void testBulkBlocking() throws Exception {
- final String index = "indexblocking-" + UUID.randomUUID();
- ElasticSearchConfig config = new ElasticSearchConfig()
- .setElasticSearchUrl("http://"+container.getHttpHostAddress())
- .setIndexName(index)
- .setBulkEnabled(true)
- .setMaxRetries(1000)
- .setBulkActions(2)
- .setBulkConcurrentRequests(2)
- .setRetryBackoffInMs(100)
- .setBulkFlushIntervalInMs(10000);
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
- assertTrue(client.createIndexIfNeeded(index));
+ try (ElasticToxiproxiContainer toxiproxy = new ElasticToxiproxiContainer(container, network)) {
+ toxiproxy.start();
- try {
- MockRecord<GenericObject> mockRecord = new MockRecord<>();
- for (int i = 1; i <= 5; i++) {
- client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
- }
+ final String index = "indexblocking-" + UUID.randomUUID();
+ ElasticSearchConfig config = new ElasticSearchConfig()
+ .setElasticSearchUrl("http://" + toxiproxy.getHttpHostAddress())
+ .setIndexName(index)
+ .setBulkEnabled(true)
+ .setMaxRetries(1000)
+ .setBulkActions(2)
+ .setBulkConcurrentRequests(2)
+ .setRetryBackoffInMs(100)
+ .setBulkFlushIntervalInMs(10000);
+ try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ assertTrue(client.createIndexIfNeeded(index));
- Awaitility.await().untilAsserted(() -> {
- assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
+ try {
+ MockRecord<GenericObject> mockRecord = new MockRecord<>();
+ for (int i = 1; i <= 5; i++) {
+ client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
+ }
+
+ Awaitility.await().untilAsserted(() -> {
+ assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
+ assertEquals(mockRecord.failed, 0);
+ assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
+ });
+ client.flush();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(mockRecord.failed, 0);
+ assertEquals(mockRecord.acked, 5);
+ assertEquals(client.totalHits(index), 5);
+ });
+
+ log.info("starting the toxic");
+ toxiproxy.getProxy().setConnectionCut(false);
+ toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 30000);
+ toxiproxy.removeToxicAfterDelay("elasticpause", 30000);
+
+ long start = System.currentTimeMillis();
+
+ // 11th bulkIndex is blocking because we have 2 pending requests, and the 3rd request is blocked.
+ for (int i = 6; i <= 15; i++) {
+ client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
+ log.info("{} index {}", System.currentTimeMillis(), i);
+ }
+ long elapsed = System.currentTimeMillis() - start;
+ log.info("elapsed = {}", elapsed);
+ assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy
+
+ Thread.sleep(3000L);
+ assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
- assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
- });
- client.flush();
- Awaitility.await().untilAsserted(() -> {
- assertEquals(mockRecord.failed, 0);
- assertEquals(mockRecord.acked, 5);
- assertEquals(client.totalHits(index), 5);
- });
+ assertEquals(client.records.size(), 0);
- ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 30);
- chaosContainer.start();
- Thread.sleep(1000L);
-
- // 11th bulkIndex is blocking because we have 2 pending requests, and the 3rd request is blocked.
- long start = System.currentTimeMillis();
- for (int i = 6; i <= 15; i++) {
- client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
- log.info("{} index {}", System.currentTimeMillis(), i);
+ } finally {
+ client.delete(index);
}
- long elapsed = System.currentTimeMillis() - start;
- log.info("elapsed = {}", elapsed);
- assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy
-
- Thread.sleep(1000L);
- assertEquals(mockRecord.acked, 15);
- assertEquals(mockRecord.failed, 0);
- assertEquals(client.records.size(), 0);
-
- chaosContainer.stop();
- } finally {
- client.delete(index);
}
}
}
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java
index feae446..a4ecd64 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java
@@ -46,7 +46,7 @@
public class ElasticSearchSinkRawDataTests {
public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
- .orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64");
+ .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
private static ElasticsearchContainer container;
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index c752354..0377c76 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -40,11 +40,11 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.elasticsearch.data.UserProfile;
-import org.elasticsearch.client.Node;
-import org.elasticsearch.client.RestHighLevelClient;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.opensearch.client.Node;
+import org.opensearch.client.RestHighLevelClient;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -57,7 +57,7 @@
public class ElasticSearchSinkTests {
public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
- .orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64");
+ .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
private static ElasticsearchContainer container;
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java
deleted file mode 100644
index 4b296bb..0000000
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.elasticsearch.testcontainers;
-
-import lombok.extern.slf4j.Slf4j;
-import org.awaitility.Awaitility;
-import org.testcontainers.containers.BindMode;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.containers.wait.strategy.WaitStrategy;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-
-// see https://github.com/alexei-led/pumba
-@Slf4j
-public class ChaosContainer<SELF extends ChaosContainer<SELF>> extends GenericContainer<SELF> {
-
- public static final String PUMBA_IMAGE = Optional.ofNullable(System.getenv("PUMBA_IMAGE"))
- .orElse("gaiaadm/pumba:0.8.0");
-
- private final List<String> logs = new ArrayList<>();
- private Consumer<ChaosContainer> beforeStop;
-
- public static ChaosContainer pauseContainerForSeconds(String targetContainer, int seconds) {
- return new ChaosContainer(targetContainer, "pause --duration " + seconds + "s", Wait.forLogMessage(".*pausing container.*", 1),
- (Consumer<ChaosContainer>) chaosContainer -> Awaitility
- .await()
- .atMost(seconds + 5, TimeUnit.SECONDS)
- .until(() -> {
- boolean found = chaosContainer.logs.stream().anyMatch((Predicate<String>) line -> line.contains("stop pausing container"));
- if (!found) {
- log.debug("ChaosContainer stop requested. waiting for \"stop pausing container\" log");
- log.debug(String.join("\n", chaosContainer.logs));
- }
- return found;
- }
- ));
- }
-
- private ChaosContainer(String targetContainer, String command, WaitStrategy waitStrategy, Consumer<ChaosContainer> beforeStop) {
- super(PUMBA_IMAGE);
- setCommand("--log-level info " + command + " " + targetContainer);
- addFileSystemBind("/var/run/docker.sock", "/var/run/docker.sock", BindMode.READ_WRITE);
- setWaitStrategy(waitStrategy);
- withLogConsumer(o -> {
- final String string = o.getUtf8String();
- log.info("pumba> {}", string);
- logs.add(string);
- });
- this.beforeStop = beforeStop;
- }
-
- @Override
- public void stop() {
- if (getContainerId() != null && beforeStop != null) {
- beforeStop.accept(this);
- }
- super.stop();
- }
-}
\ No newline at end of file
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ElasticToxiproxiContainer.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ElasticToxiproxiContainer.java
new file mode 100644
index 0000000..348c350
--- /dev/null
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ElasticToxiproxiContainer.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.elasticsearch.testcontainers;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.ToxiproxyContainer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Timer;
+import java.util.TimerTask;
+
+@Slf4j
+// Toxiproxy container, which will be used as a TCP proxy
+public class ElasticToxiproxiContainer extends ToxiproxyContainer {
+
+ public static final DockerImageName TOXIPROXY_IMAGE = DockerImageName.parse("shopify/toxiproxy:2.1.4");
+
+ final ElasticsearchContainer container;
+ ToxiproxyContainer.ContainerProxy proxy;
+
+ public ElasticToxiproxiContainer(ElasticsearchContainer container, Network network) {
+ super(TOXIPROXY_IMAGE);
+ this.withNetwork(network);
+ this.container = container;
+ }
+
+ @Override
+ public void start() {
+ log.info("Starting toxiproxy container");
+ super.start();
+ proxy = this.getProxy(container, 9200);
+ }
+
+ public String getHttpHostAddress() {
+ Objects.nonNull(proxy);
+ return proxy.getContainerIpAddress() + ":" + proxy.getProxyPort();
+ }
+
+ public ToxiproxyContainer.ContainerProxy getProxy() {
+ Objects.nonNull(proxy);
+ return proxy;
+ }
+
+ public void removeToxicAfterDelay(String toxicName, long delayMs) {
+ Objects.nonNull(proxy);
+ Timer timer = new Timer();
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ log.info("removing the toxic {}", toxicName);
+ proxy.toxics().get(toxicName).remove();
+ } catch (IOException e) {
+ log.error("failed to remove toxic " + toxicName, e);
+ }
+ }
+ }, delayMs);
+ }
+
+}
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 5ff2ee4..a64141f 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -146,8 +146,8 @@
</dependency>
<dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
+ <groupId>org.opensearch.client</groupId>
+ <artifactId>opensearch-rest-high-level-client</artifactId>
<scope>test</scope>
</dependency>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
index 4b859a7..75f80bf 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
@@ -39,12 +39,12 @@
import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
+import org.opensearch.action.search.SearchRequest;
+import org.opensearch.action.search.SearchResponse;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
@Slf4j
public class ElasticSearchSinkTester extends SinkTester<ElasticSearchContainer> {