[FLINK-20987] Upgrade to Flink 1.12.1
This closes #192.
diff --git a/docs/_config.yml b/docs/_config.yml
index 0c1b6ef..05841e7 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -32,7 +32,7 @@
# release this should be the same as the regular version
version_title: "2.3-SNAPSHOT"
# The Flink version supported by this version of Stateful Functions
-flink_version: "1.11.3"
+flink_version: "1.12.1"
# Branch on Github for this version
github_branch: "master"
diff --git a/pom.xml b/pom.xml
index 3d826f8..196a2b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,7 +76,7 @@
<protobuf.version>3.7.1</protobuf.version>
<unixsocket.version>2.3.2</unixsocket.version>
<protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version>
- <flink.version>1.11.3</flink.version>
+ <flink.version>1.12.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<root.dir>${rootDir}</root.dir>
</properties>
diff --git a/statefun-flink/statefun-flink-common/pom.xml b/statefun-flink/statefun-flink-common/pom.xml
index 82006b8..f4ef3f5 100644
--- a/statefun-flink/statefun-flink-common/pom.xml
+++ b/statefun-flink/statefun-flink-common/pom.xml
@@ -45,7 +45,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
- <version>2.10.1-11.0</version>
+ <version>2.10.1-12.0</version>
</dependency>
<!-- tests -->
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index ac2065e..3b5dace 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -91,7 +91,8 @@
FlinkUserCodeClassLoaders.parentFirst(
new URL[0],
StatefulFunctionsJob.class.getClassLoader(),
- FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER);
+ FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER,
+ false);
Thread.currentThread().setContextClassLoader(flinkClassLoader);
}
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FlinkTimerServiceFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FlinkTimerServiceFactory.java
index f008f87..c55783b 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FlinkTimerServiceFactory.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FlinkTimerServiceFactory.java
@@ -23,7 +23,6 @@
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimerSerializer;
import org.apache.flink.streaming.api.operators.Triggerable;
final class FlinkTimerServiceFactory implements TimerServiceFactory {
@@ -41,10 +40,11 @@
@Override
public InternalTimerService<VoidNamespace> createTimerService(
Triggerable<String, VoidNamespace> triggerable) {
- final TimerSerializer<String, VoidNamespace> timerSerializer =
- new TimerSerializer<>(StringSerializer.INSTANCE, VoidNamespaceSerializer.INSTANCE);
return timeServiceManager.getInternalTimerService(
- DELAYED_MSG_TIMER_SERVICE_NAME, timerSerializer, triggerable);
+ DELAYED_MSG_TIMER_SERVICE_NAME,
+ StringSerializer.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ triggerable);
}
}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
index d389841..ab2a8f7 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
@@ -53,6 +53,7 @@
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
@@ -236,11 +237,6 @@
}
@Override
- public Map<String, Accumulator<?, ?>> getAllAccumulators() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public IntCounter getIntCounter(String name) {
throw new UnsupportedOperationException();
}
@@ -301,6 +297,11 @@
AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void registerUserCodeClassLoaderReleaseHookIfAbsent(String s, Runnable runnable) {
+ throw new UnsupportedOperationException();
+ }
}
private static final class FakeKeyedStateBackend implements KeyedStateBackend<Object> {
@@ -369,6 +370,11 @@
public TypeSerializer<Object> getKeySerializer() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public <N> Stream<Tuple2<Object, N>> getKeysAndNamespaces(String state) {
+ throw new UnsupportedOperationException();
+ }
}
private static final class FakeTimerServiceFactory implements TimerServiceFactory {
diff --git a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
index ec91aec..a854701 100644
--- a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
+++ b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
@@ -9,10 +9,10 @@
- com.kohlschutter.junixsocket:junixsocket-core:2.3.2
- com.kohlschutter.junixsocket:junixsocket-common:2.3.2
- com.kohlschutter.junixsocket:junixsocket-native-common:2.3.2
-- commons-codec:commons-codec:1.10
+- commons-codec:commons-codec:1.13
- commons-lang:commons-lang:2.6
- commons-logging:commons-logging:1.1.3
-- commons-io:commons-io:2.4
+- commons-io:commons-io:2.7
- com.google.auto.service:auto-service-annotations:1.0-rc6
- com.google.guava:guava:18.0
- org.apache.commons:commons-lang3:3.3.2
diff --git a/tools/docker/Dockerfile b/tools/docker/Dockerfile
index 9b844c5..155755d 100644
--- a/tools/docker/Dockerfile
+++ b/tools/docker/Dockerfile
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-FROM flink:1.11.3-scala_2.12-java8
+FROM apache/flink:1.12.1-scala_2.12-java8
ENV ROLE worker
ENV MASTER_HOST localhost