Upgraded ElasticSearch to get rid of CVEs (and switched client to OpenSearch one) (#13867)

* Upgraded ElasticSearch to get rid of CVEs. (#13747)

* Upgraded ElasticSearch to get rid of CVEs.

CVE-2020-7020
CVE-2020-7021
CVE-2021-22132
CVE-2021-22134
CVE-2021-22144
CVE-2021-22147

* Elastic search client version >= 7.11 no longer works with OSS Elastic images (and elastic.co no longer releases OSS images)

* Fixed tests for Elasticsearch

* pom cleanup

* Switched to OpenSearch client for Elastic (Apache 2 licensed)
diff --git a/pom.xml b/pom.xml
index 4444201..d91cbfa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,7 +153,7 @@
     <mariadb-jdbc.version>2.6.0</mariadb-jdbc.version>
     <hdfs-offload-version3>3.3.1</hdfs-offload-version3>
     <json-smart.version>2.4.7</json-smart.version>
-    <elasticsearch.version>7.9.1</elasticsearch.version>
+    <opensearch.version>1.2.4</opensearch.version>
     <presto.version>332</presto.version>
     <scala.binary.version>2.13</scala.binary.version>
     <scala-library.version>2.13.6</scala-library.version>
@@ -1116,9 +1116,9 @@
       </dependency>
 
       <dependency>
-        <groupId>org.elasticsearch.client</groupId>
-        <artifactId>elasticsearch-rest-high-level-client</artifactId>
-        <version>${elasticsearch.version}</version>
+        <groupId>org.opensearch.client</groupId>
+        <artifactId>opensearch-rest-high-level-client</artifactId>
+        <version>${opensearch.version}</version>
       </dependency>
 
       <dependency>
@@ -1328,6 +1328,7 @@
         </dependencies>
       </plugin>
       <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>
         <configuration>
           <encoding>UTF-8</encoding>
@@ -1845,6 +1846,7 @@
         <pluginManagement>
           <plugins>
             <plugin>
+              <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-compiler-plugin</artifactId>
               <configuration>
                 <!-- for some reason, setting maven.compiler.release property alone doesn't work -->
diff --git a/pulsar-io/elastic-search/pom.xml b/pulsar-io/elastic-search/pom.xml
index a4fcc5a..c13c6ac 100644
--- a/pulsar-io/elastic-search/pom.xml
+++ b/pulsar-io/elastic-search/pom.xml
@@ -28,6 +28,14 @@
   <artifactId>pulsar-io-elastic-search</artifactId>
   <name>Pulsar IO :: ElasticSearch</name>
 
+  <properties>
+    <!--
+     Work-around for "Container exited with code 137" (OOM)
+    -->
+    <testReuseFork>false</testReuseFork>
+    <testForkCount>1</testForkCount>
+  </properties>
+
   <dependencies>
 
     <dependency>
@@ -76,14 +84,18 @@
     </dependency>
 
     <dependency>
-      <groupId>org.elasticsearch.client</groupId>
-      <artifactId>elasticsearch-rest-high-level-client</artifactId>
+      <groupId>org.opensearch.client</groupId>
+      <artifactId>opensearch-rest-high-level-client</artifactId>
     </dependency>
 
     <dependency>
       <groupId>org.testcontainers</groupId>
       <artifactId>elasticsearch</artifactId>
-      <version>1.15.3</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>toxiproxy</artifactId>
       <scope>test</scope>
     </dependency>
 
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index 2852532..78b8c86 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -72,35 +72,37 @@
 import org.apache.http.ssl.SSLContexts;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.functions.api.Record;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.client.Node;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.client.indices.CreateIndexResponse;
-import org.elasticsearch.client.indices.GetIndexRequest;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.DocWriteResponse;
+import org.opensearch.action.admin.indices.create.CreateIndexRequest;
+import org.opensearch.action.admin.indices.create.CreateIndexResponse;
+import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.opensearch.action.admin.indices.refresh.RefreshRequest;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkProcessor;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.delete.DeleteResponse;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.action.search.SearchRequest;
+import org.opensearch.action.search.SearchResponse;
+import org.opensearch.action.support.master.AcknowledgedResponse;
+import org.opensearch.client.Node;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.Requests;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.client.indices.GetIndexRequest;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.unit.ByteSizeUnit;
+import org.opensearch.common.unit.ByteSizeValue;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.xcontent.XContentType;
+import org.opensearch.index.query.QueryBuilders;
+import org.opensearch.search.builder.SearchSourceBuilder;
 
 @Slf4j
 public class ElasticSearchClient implements AutoCloseable {
@@ -464,7 +466,7 @@
     }
 
     @VisibleForTesting
-    protected org.elasticsearch.action.search.SearchResponse search(String indexName) throws IOException {
+    protected SearchResponse search(String indexName) throws IOException {
         client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
         return client.search(
                 new SearchRequest()
@@ -474,7 +476,7 @@
     }
 
     @VisibleForTesting
-    protected org.elasticsearch.action.support.master.AcknowledgedResponse delete(String indexName) throws IOException {
+    protected AcknowledgedResponse delete(String indexName) throws IOException {
         return client.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
     }
 
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialBackoffPolicy.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialBackoffPolicy.java
index 1538775..1f71eb2 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialBackoffPolicy.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialBackoffPolicy.java
@@ -20,8 +20,8 @@
 
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.common.unit.TimeValue;
+import org.opensearch.action.bulk.BackoffPolicy;
+import org.opensearch.common.unit.TimeValue;
 
 public class RandomExponentialBackoffPolicy extends BackoffPolicy {
     private final RandomExponentialRetry randomExponentialRetry;
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
index 43fffab..2070a46 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
@@ -34,7 +34,7 @@
 public class ElasticSearchClientSslTests {
 
     public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
-            .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.10.2-amd64");
+            .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
 
     final static String INDEX = "myindex";
 
@@ -44,9 +44,8 @@
     @Test
     public void testSslBasic() throws IOException {
         try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
-                .withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
                 .withFileSystemBind(sslResourceDir, configDir + "/ssl")
-                .withEnv("ELASTIC_PASSWORD","elastic")  // boostrap password
+                .withPassword("elastic")
                 .withEnv("xpack.license.self_generated.type", "trial")
                 .withEnv("xpack.security.enabled", "true")
                 .withEnv("xpack.security.http.ssl.enabled", "true")
@@ -80,9 +79,8 @@
     @Test
     public void testSslWithHostnameVerification() throws IOException {
         try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
-                .withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
                 .withFileSystemBind(sslResourceDir, configDir + "/ssl")
-                .withEnv("ELASTIC_PASSWORD","elastic")  // boostrap password
+                .withPassword("elastic")
                 .withEnv("xpack.license.self_generated.type", "trial")
                 .withEnv("xpack.security.enabled", "true")
                 .withEnv("xpack.security.http.ssl.enabled", "true")
@@ -119,9 +117,8 @@
     @Test
     public void testSslWithClientAuth() throws IOException {
         try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
-                .withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
                 .withFileSystemBind(sslResourceDir, configDir + "/ssl")
-                .withEnv("ELASTIC_PASSWORD","elastic")  // boostrap password
+                .withPassword("elastic")
                 .withEnv("xpack.license.self_generated.type", "trial")
                 .withEnv("xpack.security.enabled", "true")
                 .withEnv("xpack.security.http.ssl.enabled", "true")
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index 223bbfe..41bbe22 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -18,16 +18,18 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
+import eu.rekawek.toxiproxy.model.ToxicDirection;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.elasticsearch.testcontainers.ChaosContainer;
+import org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer;
 import org.awaitility.Awaitility;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
 import org.junit.AfterClass;
 import org.mockito.Mockito;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexRequest;
+import org.testcontainers.containers.Network;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -45,21 +47,22 @@
 
 @Slf4j
 public class ElasticSearchClientTests {
-
     public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
-            .orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64");
+            .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
 
     static ElasticsearchContainer container;
+    static Network network = Network.newNetwork();
 
     @BeforeClass
     public static final void initBeforeClass() throws IOException {
-        container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE);
+        container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE).withNetwork(network);
         container.start();
     }
 
     @AfterClass
     public static void closeAfterClass() {
         container.close();
+        network.close();
     }
 
     static class MockRecord<T> implements Record<T> {
@@ -222,101 +225,111 @@
 
     @Test
     public void testBulkRetry() throws Exception {
-        final String index = "indexbulktest-" + UUID.randomUUID();
-        ElasticSearchConfig config = new ElasticSearchConfig()
-                .setElasticSearchUrl("http://"+container.getHttpHostAddress())
-                .setIndexName(index)
-                .setBulkEnabled(true)
-                .setMaxRetries(1000)
-                .setBulkActions(2)
-                .setRetryBackoffInMs(100)
-                // disabled, we want to have full control over flush() method
-                .setBulkFlushIntervalInMs(-1);
+        try (ElasticToxiproxiContainer toxiproxy = new ElasticToxiproxiContainer(container, network)) {
+            toxiproxy.start();
 
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
-            try {
-                assertTrue(client.createIndexIfNeeded(index));
-                MockRecord<GenericObject> mockRecord = new MockRecord<>();
-                client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
-                client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
-                assertEquals(mockRecord.acked, 2);
-                assertEquals(mockRecord.failed, 0);
-                assertEquals(client.totalHits(index), 2);
+            final String index = "indexbulktest-" + UUID.randomUUID();
+            ElasticSearchConfig config = new ElasticSearchConfig()
+                    .setElasticSearchUrl("http://" + toxiproxy.getHttpHostAddress())
+                    .setIndexName(index)
+                    .setBulkEnabled(true)
+                    .setMaxRetries(1000)
+                    .setBulkActions(2)
+                    .setRetryBackoffInMs(100)
+                    // disabled, we want to have full control over flush() method
+                    .setBulkFlushIntervalInMs(-1);
 
-                ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 15);
-                chaosContainer.start();
+            try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+                try {
+                    assertTrue(client.createIndexIfNeeded(index));
+                    MockRecord<GenericObject> mockRecord = new MockRecord<>();
+                    client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
+                    client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
+                    assertEquals(mockRecord.acked, 2);
+                    assertEquals(mockRecord.failed, 0);
+                    assertEquals(client.totalHits(index), 2);
 
-                client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
-                assertEquals(mockRecord.acked, 2);
-                assertEquals(mockRecord.failed, 0);
-                assertEquals(client.totalHits(index), 2);
+                    log.info("starting the toxic");
+                    toxiproxy.getProxy().setConnectionCut(false);
+                    toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 15000);
+                    toxiproxy.removeToxicAfterDelay("elasticpause", 15000);
 
-                chaosContainer.stop();
-                client.flush();
-                assertEquals(mockRecord.acked, 3);
-                assertEquals(mockRecord.failed, 0);
-                assertEquals(client.totalHits(index), 3);
-            } finally {
-                client.delete(index);
+                    client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
+                    assertEquals(mockRecord.acked, 2);
+                    assertEquals(mockRecord.failed, 0);
+                    assertEquals(client.totalHits(index), 2);
+
+                    client.flush();
+                    assertEquals(mockRecord.acked, 3);
+                    assertEquals(mockRecord.failed, 0);
+                    assertEquals(client.totalHits(index), 3);
+                } finally {
+                    client.delete(index);
+                }
             }
         }
     }
 
     @Test
     public void testBulkBlocking() throws Exception {
-        final String index = "indexblocking-" + UUID.randomUUID();
-        ElasticSearchConfig config = new ElasticSearchConfig()
-                .setElasticSearchUrl("http://"+container.getHttpHostAddress())
-                .setIndexName(index)
-                .setBulkEnabled(true)
-                .setMaxRetries(1000)
-                .setBulkActions(2)
-                .setBulkConcurrentRequests(2)
-                .setRetryBackoffInMs(100)
-                .setBulkFlushIntervalInMs(10000);
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
-            assertTrue(client.createIndexIfNeeded(index));
+        try (ElasticToxiproxiContainer toxiproxy = new ElasticToxiproxiContainer(container, network)) {
+            toxiproxy.start();
 
-            try {
-                MockRecord<GenericObject> mockRecord = new MockRecord<>();
-                for (int i = 1; i <= 5; i++) {
-                    client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
-                }
+            final String index = "indexblocking-" + UUID.randomUUID();
+            ElasticSearchConfig config = new ElasticSearchConfig()
+                    .setElasticSearchUrl("http://" + toxiproxy.getHttpHostAddress())
+                    .setIndexName(index)
+                    .setBulkEnabled(true)
+                    .setMaxRetries(1000)
+                    .setBulkActions(2)
+                    .setBulkConcurrentRequests(2)
+                    .setRetryBackoffInMs(100)
+                    .setBulkFlushIntervalInMs(10000);
+            try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+                assertTrue(client.createIndexIfNeeded(index));
 
-                Awaitility.await().untilAsserted(() -> {
-                    assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
+                try {
+                    MockRecord<GenericObject> mockRecord = new MockRecord<>();
+                    for (int i = 1; i <= 5; i++) {
+                        client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
+                    }
+
+                    Awaitility.await().untilAsserted(() -> {
+                        assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
+                        assertEquals(mockRecord.failed, 0);
+                        assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
+                    });
+                    client.flush();
+                    Awaitility.await().untilAsserted(() -> {
+                        assertEquals(mockRecord.failed, 0);
+                        assertEquals(mockRecord.acked, 5);
+                        assertEquals(client.totalHits(index), 5);
+                    });
+
+                    log.info("starting the toxic");
+                    toxiproxy.getProxy().setConnectionCut(false);
+                    toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 30000);
+                    toxiproxy.removeToxicAfterDelay("elasticpause", 30000);
+
+                    long start = System.currentTimeMillis();
+
+                    // 11th bulkIndex is blocking because we have 2 pending requests, and the 3rd request is blocked.
+                    for (int i = 6; i <= 15; i++) {
+                        client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
+                        log.info("{} index {}", System.currentTimeMillis(), i);
+                    }
+                    long elapsed = System.currentTimeMillis() - start;
+                    log.info("elapsed = {}", elapsed);
+                    assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy
+
+                    Thread.sleep(3000L);
+                    assertEquals(mockRecord.acked, 15);
                     assertEquals(mockRecord.failed, 0);
-                    assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
-                });
-                client.flush();
-                Awaitility.await().untilAsserted(() -> {
-                    assertEquals(mockRecord.failed, 0);
-                    assertEquals(mockRecord.acked, 5);
-                    assertEquals(client.totalHits(index), 5);
-                });
+                    assertEquals(client.records.size(), 0);
 
-                ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 30);
-                chaosContainer.start();
-                Thread.sleep(1000L);
-
-                // 11th bulkIndex is blocking because we have 2 pending requests, and the 3rd request is blocked.
-                long start = System.currentTimeMillis();
-                for (int i = 6; i <= 15; i++) {
-                    client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
-                    log.info("{} index {}", System.currentTimeMillis(), i);
+                } finally {
+                    client.delete(index);
                 }
-                long elapsed = System.currentTimeMillis() - start;
-                log.info("elapsed = {}", elapsed);
-                assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy
-
-                Thread.sleep(1000L);
-                assertEquals(mockRecord.acked, 15);
-                assertEquals(mockRecord.failed, 0);
-                assertEquals(client.records.size(), 0);
-
-                chaosContainer.stop();
-            } finally {
-                client.delete(index);
             }
         }
     }
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java
index feae446..a4ecd64 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java
@@ -46,7 +46,7 @@
 public class ElasticSearchSinkRawDataTests {
 
     public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
-            .orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64");
+            .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
 
     private static ElasticsearchContainer container;
 
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index c752354..0377c76 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -40,11 +40,11 @@
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.elasticsearch.data.UserProfile;
-import org.elasticsearch.client.Node;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.opensearch.client.Node;
+import org.opensearch.client.RestHighLevelClient;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -57,7 +57,7 @@
 public class ElasticSearchSinkTests {
 
     public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
-            .orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64");
+            .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
 
     private static ElasticsearchContainer container;
 
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java
deleted file mode 100644
index 4b296bb..0000000
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ChaosContainer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.elasticsearch.testcontainers;
-
-import lombok.extern.slf4j.Slf4j;
-import org.awaitility.Awaitility;
-import org.testcontainers.containers.BindMode;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.containers.wait.strategy.WaitStrategy;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-
-// see https://github.com/alexei-led/pumba
-@Slf4j
-public class ChaosContainer<SELF extends ChaosContainer<SELF>> extends GenericContainer<SELF> {
-
-    public static final String PUMBA_IMAGE = Optional.ofNullable(System.getenv("PUMBA_IMAGE"))
-            .orElse("gaiaadm/pumba:0.8.0");
-
-    private final List<String> logs = new ArrayList<>();
-    private Consumer<ChaosContainer> beforeStop;
-
-    public static ChaosContainer pauseContainerForSeconds(String targetContainer, int seconds) {
-        return new ChaosContainer(targetContainer, "pause --duration " + seconds + "s", Wait.forLogMessage(".*pausing container.*", 1),
-                (Consumer<ChaosContainer>) chaosContainer -> Awaitility
-                        .await()
-                        .atMost(seconds + 5, TimeUnit.SECONDS)
-                        .until(() -> {
-                                    boolean found = chaosContainer.logs.stream().anyMatch((Predicate<String>) line -> line.contains("stop pausing container"));
-                                    if (!found) {
-                                        log.debug("ChaosContainer stop requested. waiting for \"stop pausing container\" log");
-                                        log.debug(String.join("\n", chaosContainer.logs));
-                                    }
-                                    return found;
-                                }
-                        ));
-    }
-
-    private ChaosContainer(String targetContainer, String command, WaitStrategy waitStrategy, Consumer<ChaosContainer> beforeStop) {
-        super(PUMBA_IMAGE);
-        setCommand("--log-level info " + command + " " + targetContainer);
-        addFileSystemBind("/var/run/docker.sock", "/var/run/docker.sock", BindMode.READ_WRITE);
-        setWaitStrategy(waitStrategy);
-        withLogConsumer(o -> {
-            final String string = o.getUtf8String();
-            log.info("pumba> {}", string);
-            logs.add(string);
-        });
-        this.beforeStop = beforeStop;
-    }
-
-    @Override
-    public void stop() {
-        if (getContainerId() != null && beforeStop != null) {
-            beforeStop.accept(this);
-        }
-        super.stop();
-    }
-}
\ No newline at end of file
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ElasticToxiproxiContainer.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ElasticToxiproxiContainer.java
new file mode 100644
index 0000000..348c350
--- /dev/null
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ElasticToxiproxiContainer.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.elasticsearch.testcontainers;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.ToxiproxyContainer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Timer;
+import java.util.TimerTask;
+
+@Slf4j
+// Toxiproxy container, which will be used as a TCP proxy
+public class ElasticToxiproxiContainer extends ToxiproxyContainer {
+
+    public static final DockerImageName TOXIPROXY_IMAGE = DockerImageName.parse("shopify/toxiproxy:2.1.4");
+
+    final ElasticsearchContainer container;
+    ToxiproxyContainer.ContainerProxy proxy;
+
+    public ElasticToxiproxiContainer(ElasticsearchContainer container, Network network) {
+        super(TOXIPROXY_IMAGE);
+        this.withNetwork(network);
+        this.container = container;
+    }
+
+    @Override
+    public void start() {
+        log.info("Starting toxiproxy container");
+        super.start();
+        proxy = this.getProxy(container, 9200);
+    }
+
+    public String getHttpHostAddress() {
+        Objects.nonNull(proxy);
+        return proxy.getContainerIpAddress() + ":" + proxy.getProxyPort();
+    }
+
+    public ToxiproxyContainer.ContainerProxy getProxy() {
+        Objects.nonNull(proxy);
+        return proxy;
+    }
+
+    public void removeToxicAfterDelay(String toxicName, long delayMs) {
+        Objects.nonNull(proxy);
+        Timer timer = new Timer();
+        timer.schedule(new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    log.info("removing the toxic {}", toxicName);
+                    proxy.toxics().get(toxicName).remove();
+                } catch (IOException e) {
+                    log.error("failed to remove toxic " + toxicName, e);
+                }
+            }
+        }, delayMs);
+    }
+
+}
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 5ff2ee4..a64141f 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -146,8 +146,8 @@
     </dependency>
 
     <dependency>
-  	  <groupId>org.elasticsearch.client</groupId>
-  	  <artifactId>elasticsearch-rest-high-level-client</artifactId>
+      <groupId>org.opensearch.client</groupId>
+      <artifactId>opensearch-rest-high-level-client</artifactId>
       <scope>test</scope>
   	</dependency>
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
index 4b859a7..75f80bf 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
@@ -39,12 +39,12 @@
 import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.awaitility.Awaitility;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
+import org.opensearch.action.search.SearchRequest;
+import org.opensearch.action.search.SearchResponse;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
 
 @Slf4j
 public class ElasticSearchSinkTester extends SinkTester<ElasticSearchContainer> {