[FLINK-20987] Upgrade to Flink 1.12.1

This closes #192.
diff --git a/docs/_config.yml b/docs/_config.yml
index 0c1b6ef..05841e7 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -32,7 +32,7 @@
 # release this should be the same as the regular version
 version_title: "2.3-SNAPSHOT"
 # The Flink version supported by this version of Stateful Functions
-flink_version: "1.11.3"
+flink_version: "1.12.1"
 # Branch on Github for this version
 github_branch: "master"
 
diff --git a/pom.xml b/pom.xml
index 3d826f8..196a2b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,7 +76,7 @@
         <protobuf.version>3.7.1</protobuf.version>
         <unixsocket.version>2.3.2</unixsocket.version>
         <protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version>
-        <flink.version>1.11.3</flink.version>
+        <flink.version>1.12.1</flink.version>
         <scala.binary.version>2.12</scala.binary.version>
         <root.dir>${rootDir}</root.dir>
     </properties>
diff --git a/statefun-flink/statefun-flink-common/pom.xml b/statefun-flink/statefun-flink-common/pom.xml
index 82006b8..f4ef3f5 100644
--- a/statefun-flink/statefun-flink-common/pom.xml
+++ b/statefun-flink/statefun-flink-common/pom.xml
@@ -45,7 +45,7 @@
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-jackson</artifactId>
-            <version>2.10.1-11.0</version>
+            <version>2.10.1-12.0</version>
         </dependency>
 
         <!-- tests -->
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index ac2065e..3b5dace 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -91,7 +91,8 @@
           FlinkUserCodeClassLoaders.parentFirst(
               new URL[0],
               StatefulFunctionsJob.class.getClassLoader(),
-              FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER);
+              FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER,
+              false);
       Thread.currentThread().setContextClassLoader(flinkClassLoader);
     }
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FlinkTimerServiceFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FlinkTimerServiceFactory.java
index f008f87..c55783b 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FlinkTimerServiceFactory.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FlinkTimerServiceFactory.java
@@ -23,7 +23,6 @@
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimerSerializer;
 import org.apache.flink.streaming.api.operators.Triggerable;
 
 final class FlinkTimerServiceFactory implements TimerServiceFactory {
@@ -41,10 +40,11 @@
   @Override
   public InternalTimerService<VoidNamespace> createTimerService(
       Triggerable<String, VoidNamespace> triggerable) {
-    final TimerSerializer<String, VoidNamespace> timerSerializer =
-        new TimerSerializer<>(StringSerializer.INSTANCE, VoidNamespaceSerializer.INSTANCE);
 
     return timeServiceManager.getInternalTimerService(
-        DELAYED_MSG_TIMER_SERVICE_NAME, timerSerializer, triggerable);
+        DELAYED_MSG_TIMER_SERVICE_NAME,
+        StringSerializer.INSTANCE,
+        VoidNamespaceSerializer.INSTANCE,
+        triggerable);
   }
 }
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
index d389841..ab2a8f7 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
@@ -53,6 +53,7 @@
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -236,11 +237,6 @@
     }
 
     @Override
-    public Map<String, Accumulator<?, ?>> getAllAccumulators() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
     public IntCounter getIntCounter(String name) {
       throw new UnsupportedOperationException();
     }
@@ -301,6 +297,11 @@
         AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void registerUserCodeClassLoaderReleaseHookIfAbsent(String s, Runnable runnable) {
+      throw new UnsupportedOperationException();
+    }
   }
 
   private static final class FakeKeyedStateBackend implements KeyedStateBackend<Object> {
@@ -369,6 +370,11 @@
     public TypeSerializer<Object> getKeySerializer() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public <N> Stream<Tuple2<Object, N>> getKeysAndNamespaces(String state) {
+      throw new UnsupportedOperationException();
+    }
   }
 
   private static final class FakeTimerServiceFactory implements TimerServiceFactory {
diff --git a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
index ec91aec..a854701 100644
--- a/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
+++ b/statefun-flink/statefun-flink-distribution/src/main/resources/META-INF/NOTICE
@@ -9,10 +9,10 @@
 - com.kohlschutter.junixsocket:junixsocket-core:2.3.2
 - com.kohlschutter.junixsocket:junixsocket-common:2.3.2
 - com.kohlschutter.junixsocket:junixsocket-native-common:2.3.2
-- commons-codec:commons-codec:1.10
+- commons-codec:commons-codec:1.13
 - commons-lang:commons-lang:2.6
 - commons-logging:commons-logging:1.1.3
-- commons-io:commons-io:2.4
+- commons-io:commons-io:2.7
 - com.google.auto.service:auto-service-annotations:1.0-rc6
 - com.google.guava:guava:18.0
 - org.apache.commons:commons-lang3:3.3.2
diff --git a/tools/docker/Dockerfile b/tools/docker/Dockerfile
index 9b844c5..155755d 100644
--- a/tools/docker/Dockerfile
+++ b/tools/docker/Dockerfile
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM flink:1.11.3-scala_2.12-java8
+FROM apache/flink:1.12.1-scala_2.12-java8
 
 ENV ROLE worker
 ENV MASTER_HOST localhost