[FLINK-36633] Add support for Flink 1.20 in Flink OpenSearch connector
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml
index 6db10f2..b8e7cf5 100644
--- a/.github/workflows/weekly.yml
+++ b/.github/workflows/weekly.yml
@@ -37,7 +37,12 @@
jdk: '8, 11, 17, 21',
branch: main
}, {
- flink: 1.20-SNAPSHOT,
+ flink: 1.20-SNAPSHOT,
+ jdk: '8, 11, 17, 21',
+ branch: main
+ },
+ {
+ flink: 1.20,
jdk: '8, 11, 17, 21',
branch: main
}, {
diff --git a/flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java b/flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java
index 1ee0b2e..57bcc85 100644
--- a/flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java
+++ b/flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java
@@ -22,6 +22,6 @@
* integration tests.
*/
public class DockerImageVersions {
- public static final String OPENSEARCH_1 = "opensearchproject/opensearch:1.3.15";
- public static final String OPENSEARCH_2 = "opensearchproject/opensearch:2.13.0";
+ public static final String OPENSEARCH_1 = "opensearchproject/opensearch:1.3.19";
+ public static final String OPENSEARCH_2 = "opensearchproject/opensearch:2.17.1";
}
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
index cf35411..bb38777 100644
--- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
@@ -17,17 +17,15 @@
package org.apache.flink.connector.opensearch.sink;
-import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.TestLoggerExtension;
-import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.http.HttpHost;
import org.junit.jupiter.api.DynamicTest;
@@ -163,8 +161,7 @@
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
new UnregisteredMetricsGroup()));
- Mockito.when(sinkInitContext.getMailboxExecutor())
- .thenReturn(new OpensearchSinkBuilderTest.DummyMailboxExecutor());
+ Mockito.when(sinkInitContext.getMailboxExecutor()).thenReturn(new SyncMailboxExecutor());
Mockito.when(sinkInitContext.getProcessingTimeService())
.thenReturn(new TestProcessingTimeService());
Mockito.when(sinkInitContext.getUserCodeClassLoader())
@@ -176,21 +173,6 @@
assertThat(called).isTrue();
}
- private static class DummyMailboxExecutor implements MailboxExecutor {
- private DummyMailboxExecutor() {}
-
- public void execute(
- ThrowingRunnable<? extends Exception> command,
- String descriptionFormat,
- Object... descriptionArgs) {}
-
- public void yield() throws InterruptedException, FlinkRuntimeException {}
-
- public boolean tryYield() throws FlinkRuntimeException {
- return false;
- }
- }
-
private OpensearchSinkBuilder<Object> createEmptyBuilder() {
return new OpensearchSinkBuilder<>();
}
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
index fa7fc16..dd7c545 100644
--- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
@@ -17,7 +17,6 @@
package org.apache.flink.connector.opensearch.sink;
-import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.opensearch.OpensearchUtil;
@@ -27,11 +26,10 @@
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLoggerExtension;
-import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.http.HttpHost;
import org.junit.jupiter.api.AfterEach;
@@ -331,7 +329,7 @@
null,
true),
metricGroup,
- new TestMailbox(),
+ new SyncMailboxExecutor(),
new DefaultRestClientFactory(),
new DefaultBulkResponseInspector(failureHandler));
}
@@ -376,29 +374,4 @@
}
}
}
-
- private static class TestMailbox implements MailboxExecutor {
-
- @Override
- public void execute(
- ThrowingRunnable<? extends Exception> command,
- String descriptionFormat,
- Object... descriptionArgs) {
- try {
- command.run();
- } catch (Exception e) {
- throw new RuntimeException("Unexpected error", e);
- }
- }
-
- @Override
- public void yield() throws InterruptedException, FlinkRuntimeException {
- Thread.sleep(100);
- }
-
- @Override
- public boolean tryYield() throws FlinkRuntimeException {
- return false;
- }
- }
}
diff --git a/flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java b/flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java
index 92d0c09..b092d1a 100644
--- a/flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java
+++ b/flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java
@@ -17,7 +17,6 @@
package org.apache.flink.connector.opensearch.sink;
-import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.opensearch.OpensearchUtil;
@@ -27,11 +26,10 @@
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLoggerExtension;
-import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.http.HttpHost;
import org.junit.jupiter.api.AfterEach;
@@ -331,7 +329,7 @@
null,
true),
metricGroup,
- new TestMailbox(),
+ new SyncMailboxExecutor(),
new DefaultRestClientFactory(),
new DefaultBulkResponseInspector(failureHandler));
}
@@ -376,29 +374,4 @@
}
}
}
-
- private static class TestMailbox implements MailboxExecutor {
-
- @Override
- public void execute(
- ThrowingRunnable<? extends Exception> command,
- String descriptionFormat,
- Object... descriptionArgs) {
- try {
- command.run();
- } catch (Exception e) {
- throw new RuntimeException("Unexpected error", e);
- }
- }
-
- @Override
- public void yield() throws InterruptedException, FlinkRuntimeException {
- Thread.sleep(100);
- }
-
- @Override
- public boolean tryYield() throws FlinkRuntimeException {
- return false;
- }
- }
}
diff --git a/pom.xml b/pom.xml
index e775f25..3ce4108 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-parent</artifactId>
- <version>1.0.0</version>
+ <version>1.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -55,7 +55,7 @@
</modules>
<properties>
- <flink.version>1.18.0</flink.version>
+ <flink.version>1.20.0</flink.version>
<commons-compress.version>1.26.1</commons-compress.version>
<jackson-bom.version>2.16.2</jackson-bom.version>
<junit5.version>5.10.2</junit5.version>
@@ -64,7 +64,7 @@
<mockito.version>3.12.4</mockito.version>
<japicmp.skip>false</japicmp.skip>
- <japicmp.referenceVersion>1.0.0-1.16</japicmp.referenceVersion>
+ <japicmp.referenceVersion>1.0.0-1.19</japicmp.referenceVersion>
<slf4j.version>1.7.36</slf4j.version>
<log4j.version>2.17.2</log4j.version>
@@ -300,7 +300,7 @@
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
- <version>1.14.13</version>
+ <version>1.14.19</version>
</dependency>
<!-- For dependency convergence -->
@@ -373,15 +373,8 @@
<dependency>
<!-- mockito/powermock mismatch -->
<groupId>net.bytebuddy</groupId>
- <artifactId>byte-buddy</artifactId>
- <version>1.14.10</version>
- </dependency>
-
- <dependency>
- <!-- mockito/powermock mismatch -->
- <groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-agent</artifactId>
- <version>1.14.10</version>
+ <version>1.14.19</version>
</dependency>
</dependencies>
</dependencyManagement>
@@ -409,6 +402,24 @@
<module>flink-sql-connector-opensearch2</module>
</modules>
</profile>
+ <!-- The spotless comes from flink-connector-parent and needs version bump to support JDK-21 -->
+ <profile>
+ <id>spotless</id>
+ <activation>
+ <jdk>[21,)</jdk>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
<build>