[FLINK-31619] Upgrade Stateful Functions to Flink 1.16.2. This closes #331
* [FLINK-31619] Upgrade Stateful Functions to Flink 1.16.1
Upgrade Flink version to 1.16.2 (this has been released since the issue was created) and update dependency-confict resolutions.
Handle org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders moving to org.apache.flink.util.FlinkUserCodeClassLoaders.
Fix ReductionsTest.java.
Modify construction of Netty request headers
diff --git a/pom.xml b/pom.xml
index 6bf1c79..1b9935c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,7 @@
<protobuf.version>3.7.1</protobuf.version>
<unixsocket.version>2.3.2</unixsocket.version>
<protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version>
- <flink.version>1.15.2</flink.version>
+ <flink.version>1.16.2</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.7</scala.version>
<lz4-java.version>1.8.0</lz4-java.version>
@@ -110,11 +110,12 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
+
<!--
Resolve dependency convergence issue:
- flink-core:1.15.2 depends on kryo:2.24.0
- flink-java:1.15.2 depends on kryo:2.21 (via com.twitter:chill-java:0.7.6)
- -->
+ flink-core:1.16.2 depends on kryo:2.24.0
+ flink-java:1.16.2 depends on kryo:2.21 (via com.twitter:chill-java:0.7.6)
+ -->
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
@@ -122,14 +123,14 @@
</dependency>
<!--
Resolve dependency convergence issue:
- flink-connector-kinesis:1.15.2 depends on jackson-databind:2.13.2.2
- flink-connector-kinesis:1.15.2 depends on jackson-databind:2.13.2
- (via com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.2)
+ flink-connector-kinesis:1.16.2 depends on jackson-databind:2.13.4.2
+ flink-connector-kinesis:1.16.2 depends on jackson-databind:2.13.4
+ (via com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.4)
-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <version>2.13.2.2</version>
+ <version>2.13.4.2</version>
</dependency>
</dependencies>
</dependencyManagement>
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index c86ef58..6803ce6 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -23,12 +23,12 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.FlinkUserCodeClassLoader;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
public class StatefulFunctionsJob {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java
index 43a5dc9..5bdeeca 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java
@@ -132,7 +132,7 @@
if (cachedHeaders != null) {
headers = cachedHeaders;
} else {
- headers = new DefaultHttpHeaders(false);
+ headers = new DefaultHttpHeaders();
headers.add(req.headers());
this.cachedHeaders = headers;
}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
index ad65827..9c36db7 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
@@ -21,54 +21,24 @@
import static org.junit.Assert.assertThat;
import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.*;
import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.CharacterFilter;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.*;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
-import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
-import org.apache.flink.runtime.state.Keyed;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.KeyedStateFunction;
-import org.apache.flink.runtime.state.PriorityComparable;
-import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
-import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.*;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.MoreExecutors;
@@ -352,10 +322,13 @@
@Nonnull
@Override
- public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
- @Nonnull TypeSerializer<N> namespaceSerializer,
- @Nonnull StateDescriptor<S, SV> stateDesc,
- @Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
+ public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
+ @Nonnull TypeSerializer<N> typeSerializer,
+ @Nonnull StateDescriptor<S, SV> stateDescriptor,
+ @Nonnull
+ StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
+ stateSnapshotTransformFactory)
+ throws Exception {
throw new UnsupportedOperationException();
}
diff --git a/tools/docker/Dockerfile b/tools/docker/Dockerfile
index 662d034..8c84423 100644
--- a/tools/docker/Dockerfile
+++ b/tools/docker/Dockerfile
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-FROM apache/flink:1.15.2-scala_2.12-java8
+FROM apache/flink:1.16.2-scala_2.12-java8
ENV ROLE worker
ENV MASTER_HOST localhost