[FLINK-36633] Add support for Flink 1.20 in Flink OpenSearch connector



Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml
index 6db10f2..b8e7cf5 100644
--- a/.github/workflows/weekly.yml
+++ b/.github/workflows/weekly.yml
@@ -37,7 +37,12 @@
           jdk: '8, 11, 17, 21',
           branch: main
         }, {
-          flink: 1.20-SNAPSHOT,
+            flink: 1.20-SNAPSHOT,
+            jdk: '8, 11, 17, 21',
+            branch: main
+          }, 
+        {
+          flink: 1.20,
           jdk: '8, 11, 17, 21',
           branch: main
         }, {
diff --git a/flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java b/flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java
index 1ee0b2e..57bcc85 100644
--- a/flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java
+++ b/flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java
@@ -22,6 +22,6 @@
  * integration tests.
  */
 public class DockerImageVersions {
-    public static final String OPENSEARCH_1 = "opensearchproject/opensearch:1.3.15";
-    public static final String OPENSEARCH_2 = "opensearchproject/opensearch:2.13.0";
+    public static final String OPENSEARCH_1 = "opensearchproject/opensearch:1.3.19";
+    public static final String OPENSEARCH_2 = "opensearchproject/opensearch:2.17.1";
 }
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
index cf35411..bb38777 100644
--- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
@@ -17,17 +17,15 @@
 
 package org.apache.flink.connector.opensearch.sink;
 
-import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink2.Sink.InitContext;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.SimpleUserCodeClassLoader;
 import org.apache.flink.util.TestLoggerExtension;
-import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.apache.http.HttpHost;
 import org.junit.jupiter.api.DynamicTest;
@@ -163,8 +161,7 @@
                         TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
                                 new UnregisteredMetricsGroup()));
 
-        Mockito.when(sinkInitContext.getMailboxExecutor())
-                .thenReturn(new OpensearchSinkBuilderTest.DummyMailboxExecutor());
+        Mockito.when(sinkInitContext.getMailboxExecutor()).thenReturn(new SyncMailboxExecutor());
         Mockito.when(sinkInitContext.getProcessingTimeService())
                 .thenReturn(new TestProcessingTimeService());
         Mockito.when(sinkInitContext.getUserCodeClassLoader())
@@ -176,21 +173,6 @@
         assertThat(called).isTrue();
     }
 
-    private static class DummyMailboxExecutor implements MailboxExecutor {
-        private DummyMailboxExecutor() {}
-
-        public void execute(
-                ThrowingRunnable<? extends Exception> command,
-                String descriptionFormat,
-                Object... descriptionArgs) {}
-
-        public void yield() throws InterruptedException, FlinkRuntimeException {}
-
-        public boolean tryYield() throws FlinkRuntimeException {
-            return false;
-        }
-    }
-
     private OpensearchSinkBuilder<Object> createEmptyBuilder() {
         return new OpensearchSinkBuilder<>();
     }
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
index fa7fc16..dd7c545 100644
--- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.connector.opensearch.sink;
 
-import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.opensearch.OpensearchUtil;
@@ -27,11 +26,10 @@
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.TestLoggerExtension;
-import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.apache.http.HttpHost;
 import org.junit.jupiter.api.AfterEach;
@@ -331,7 +329,7 @@
                         null,
                         true),
                 metricGroup,
-                new TestMailbox(),
+                new SyncMailboxExecutor(),
                 new DefaultRestClientFactory(),
                 new DefaultBulkResponseInspector(failureHandler));
     }
@@ -376,29 +374,4 @@
             }
         }
     }
-
-    private static class TestMailbox implements MailboxExecutor {
-
-        @Override
-        public void execute(
-                ThrowingRunnable<? extends Exception> command,
-                String descriptionFormat,
-                Object... descriptionArgs) {
-            try {
-                command.run();
-            } catch (Exception e) {
-                throw new RuntimeException("Unexpected error", e);
-            }
-        }
-
-        @Override
-        public void yield() throws InterruptedException, FlinkRuntimeException {
-            Thread.sleep(100);
-        }
-
-        @Override
-        public boolean tryYield() throws FlinkRuntimeException {
-            return false;
-        }
-    }
 }
diff --git a/flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java b/flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java
index 92d0c09..b092d1a 100644
--- a/flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java
+++ b/flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.connector.opensearch.sink;
 
-import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.opensearch.OpensearchUtil;
@@ -27,11 +26,10 @@
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.TestLoggerExtension;
-import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.apache.http.HttpHost;
 import org.junit.jupiter.api.AfterEach;
@@ -331,7 +329,7 @@
                         null,
                         true),
                 metricGroup,
-                new TestMailbox(),
+                new SyncMailboxExecutor(),
                 new DefaultRestClientFactory(),
                 new DefaultBulkResponseInspector(failureHandler));
     }
@@ -376,29 +374,4 @@
             }
         }
     }
-
-    private static class TestMailbox implements MailboxExecutor {
-
-        @Override
-        public void execute(
-                ThrowingRunnable<? extends Exception> command,
-                String descriptionFormat,
-                Object... descriptionArgs) {
-            try {
-                command.run();
-            } catch (Exception e) {
-                throw new RuntimeException("Unexpected error", e);
-            }
-        }
-
-        @Override
-        public void yield() throws InterruptedException, FlinkRuntimeException {
-            Thread.sleep(100);
-        }
-
-        @Override
-        public boolean tryYield() throws FlinkRuntimeException {
-            return false;
-        }
-    }
 }
diff --git a/pom.xml b/pom.xml
index e775f25..3ce4108 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
 	<parent>
 		<groupId>org.apache.flink</groupId>
 		<artifactId>flink-connector-parent</artifactId>
-		<version>1.0.0</version>
+		<version>1.1.0</version>
 	</parent>
 
 	<modelVersion>4.0.0</modelVersion>
@@ -55,7 +55,7 @@
 	</modules>
 
 	<properties>
-		<flink.version>1.18.0</flink.version>
+		<flink.version>1.20.0</flink.version>
 		<commons-compress.version>1.26.1</commons-compress.version>
 		<jackson-bom.version>2.16.2</jackson-bom.version>
 		<junit5.version>5.10.2</junit5.version>
@@ -64,7 +64,7 @@
 		<mockito.version>3.12.4</mockito.version>
 
 		<japicmp.skip>false</japicmp.skip>
-		<japicmp.referenceVersion>1.0.0-1.16</japicmp.referenceVersion>
+		<japicmp.referenceVersion>1.0.0-1.19</japicmp.referenceVersion>
 
 		<slf4j.version>1.7.36</slf4j.version>
 		<log4j.version>2.17.2</log4j.version>
@@ -300,7 +300,7 @@
 			<dependency>
 				<groupId>net.bytebuddy</groupId>
 				<artifactId>byte-buddy</artifactId>
-				<version>1.14.13</version>
+				<version>1.14.19</version>
 			</dependency>
 
 			<!-- For dependency convergence -->
@@ -373,15 +373,8 @@
 			<dependency>
 				<!-- mockito/powermock mismatch -->
 				<groupId>net.bytebuddy</groupId>
-				<artifactId>byte-buddy</artifactId>
-				<version>1.14.10</version>
-			</dependency>
-
-			<dependency>
-				<!-- mockito/powermock mismatch -->
-				<groupId>net.bytebuddy</groupId>
 				<artifactId>byte-buddy-agent</artifactId>
-				<version>1.14.10</version>
+				<version>1.14.19</version>
 			</dependency>
 		</dependencies>
 	</dependencyManagement>
@@ -409,6 +402,24 @@
 				<module>flink-sql-connector-opensearch2</module>
 			</modules>
 		</profile>
+		<!-- The spotless comes from flink-connector-parent and needs version bump to support JDK-21 -->
+		<profile>
+			<id>spotless</id>
+			<activation>
+				<jdk>[21,)</jdk>
+			</activation>
+			<build>
+				<plugins>
+					<plugin>
+						<groupId>com.diffplug.spotless</groupId>
+						<artifactId>spotless-maven-plugin</artifactId>
+						<configuration>
+							<skip>true</skip>
+						</configuration>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
 	</profiles>
 
 	<build>