[FLINK-31619] Upgrade Stateful Functions to Flink 1.16.2. This closes #331

* [FLINK-31619] Upgrade Stateful Functions to Flink 1.16.1

Upgrade Flink version to 1.16.2 (this has been released since the issue was created) and update dependency-confict resolutions.
Handle org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders moving to org.apache.flink.util.FlinkUserCodeClassLoaders.
Fix ReductionsTest.java.
Modify construction of Netty request headers
diff --git a/pom.xml b/pom.xml
index 6bf1c79..1b9935c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,7 @@
         <protobuf.version>3.7.1</protobuf.version>
         <unixsocket.version>2.3.2</unixsocket.version>
         <protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version>
-        <flink.version>1.15.2</flink.version>
+        <flink.version>1.16.2</flink.version>
         <scala.binary.version>2.12</scala.binary.version>
         <scala.version>2.12.7</scala.version>
         <lz4-java.version>1.8.0</lz4-java.version>
@@ -110,11 +110,12 @@
                 <version>1.3</version>
                 <scope>test</scope>
             </dependency>
+
             <!--
                 Resolve dependency convergence issue:
-                flink-core:1.15.2 depends on kryo:2.24.0
-                flink-java:1.15.2 depends on kryo:2.21 (via com.twitter:chill-java:0.7.6)
-             -->
+                flink-core:1.16.2 depends on kryo:2.24.0
+                flink-java:1.16.2 depends on kryo:2.21 (via com.twitter:chill-java:0.7.6)
+            -->
             <dependency>
                 <groupId>com.esotericsoftware.kryo</groupId>
                 <artifactId>kryo</artifactId>
@@ -122,14 +123,14 @@
             </dependency>
             <!--
                 Resolve dependency convergence issue:
-                flink-connector-kinesis:1.15.2 depends on jackson-databind:2.13.2.2
-                flink-connector-kinesis:1.15.2 depends on jackson-databind:2.13.2
-                (via com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.2)
+                flink-connector-kinesis:1.16.2 depends on jackson-databind:2.13.4.2
+                flink-connector-kinesis:1.16.2 depends on jackson-databind:2.13.4
+                (via com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.4)
              -->
             <dependency>
                 <groupId>com.fasterxml.jackson.core</groupId>
                 <artifactId>jackson-databind</artifactId>
-                <version>2.13.2.2</version>
+                <version>2.13.4.2</version>
             </dependency>
         </dependencies>
     </dependencyManagement>
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index c86ef58..6803ce6 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -23,12 +23,12 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.FlinkUserCodeClassLoader;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
 
 public class StatefulFunctionsJob {
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java
index 43a5dc9..5bdeeca 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java
@@ -132,7 +132,7 @@
     if (cachedHeaders != null) {
       headers = cachedHeaders;
     } else {
-      headers = new DefaultHttpHeaders(false);
+      headers = new DefaultHttpHeaders();
       headers.add(req.headers());
       this.cachedHeaders = headers;
     }
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
index ad65827..9c36db7 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
@@ -21,54 +21,24 @@
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.stream.Stream;
 import javax.annotation.Nonnull;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.*;
 import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.state.*;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.CharacterFilter;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.*;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
-import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
-import org.apache.flink.runtime.state.Keyed;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.KeyedStateFunction;
-import org.apache.flink.runtime.state.PriorityComparable;
-import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
-import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.*;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.MoreExecutors;
@@ -352,10 +322,13 @@
 
     @Nonnull
     @Override
-    public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
-        @Nonnull TypeSerializer<N> namespaceSerializer,
-        @Nonnull StateDescriptor<S, SV> stateDesc,
-        @Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
+    public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
+        @Nonnull TypeSerializer<N> typeSerializer,
+        @Nonnull StateDescriptor<S, SV> stateDescriptor,
+        @Nonnull
+            StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
+                stateSnapshotTransformFactory)
+        throws Exception {
       throw new UnsupportedOperationException();
     }
 
diff --git a/tools/docker/Dockerfile b/tools/docker/Dockerfile
index 662d034..8c84423 100644
--- a/tools/docker/Dockerfile
+++ b/tools/docker/Dockerfile
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM apache/flink:1.15.2-scala_2.12-java8
+FROM apache/flink:1.16.2-scala_2.12-java8
 
 ENV ROLE worker
 ENV MASTER_HOST localhost