[FLINK-36308][cep] Remove deprecated APIs from CEP module (#25381)
diff --git a/docs/content.zh/docs/libs/cep.md b/docs/content.zh/docs/libs/cep.md
index b77d047..fbf29cd 100644
--- a/docs/content.zh/docs/libs/cep.md
+++ b/docs/content.zh/docs/libs/cep.md
@@ -492,7 +492,7 @@
 {{< tabs "8228f5b0-b6b3-4ca6-a56b-e5a8fd5fdc3b" >}}
 {{< tab "Java" >}}
 ```java
-next.within(Time.seconds(10));
+next.within(Duration.ofSeconds(10));
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -508,7 +508,7 @@
     .where(SimpleCondition.of(value -> value.getName().equals("a")))
     .notFollowedBy("end")
     .where(SimpleCondition.of(value -> value.getName().equals("b")))
-    .within(Time.seconds(10));
+    .within(Duration.ofSeconds(10));
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -757,7 +757,7 @@
         <p>定义匹配模式的事件序列出现的最大时间间隔。如果未完成的事件序列超过了这个事件,就会被丢弃:</p>
 
 ```java
-pattern.within(Time.seconds(10));
+pattern.within(Duration.ofSeconds(10));
 ```
 </td>
 </tr>
@@ -1151,7 +1151,7 @@
     .where(SimpleCondition.of(value -> value.getName().equals("error")))
     .followedBy("end")
     .where(SimpleCondition.of(value -> value.getName().equals("critical")))
-    .within(Time.seconds(10));
+    .within(Duration.ofSeconds(10));
 
 PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
 
diff --git a/docs/content/docs/libs/cep.md b/docs/content/docs/libs/cep.md
index c951595..3f50f13 100644
--- a/docs/content/docs/libs/cep.md
+++ b/docs/content/docs/libs/cep.md
@@ -670,12 +670,12 @@
 {{< tabs "df27eb6d-c532-430a-b56f-98ad4082e6d5" >}}
 {{< tab "Java" >}}
 ```java
-next.within(Time.seconds(10));
+next.within(Duration.ofSeconds(10));
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-next.within(Time.seconds(10))
+next.within(Duration.ofSeconds(10))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -691,14 +691,14 @@
     .where(SimpleCondition.of(value -> value.getName().equals("a")))
     .notFollowedBy("end")
     .where(SimpleCondition.of(value -> value.getName().equals("b")))
-    .within(Time.seconds(10));
+    .within(Duration.ofSeconds(10));
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
 Pattern.begin("start").where(_.getName().equals("a"))
 .notFollowedBy("end").where(_.getName == "b")
-.within(Time.seconds(10))
+.within(Duration.ofSeconds(10))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1049,12 +1049,12 @@
 {{< tabs within >}}
 {{< tab "Java" >}}
 ```java
-pattern.within(Time.seconds(10));
+pattern.within(Duration.ofSeconds(10));
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-pattern.within(Time.seconds(10))
+pattern.within(Duration.ofSeconds(10))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1508,7 +1508,7 @@
     .where(SimpleCondition.of(value -> value.getName().equals("error")))
     .followedBy("end")
     .where(SimpleCondition.of(value -> value.getName().equals("critical")))
-    .within(Time.seconds(10));
+    .within(Duration.ofSeconds(10));
 
 PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
 
@@ -1531,7 +1531,7 @@
 val pattern = Pattern.begin[Event]("start")
   .next("middle").where(_.getName == "error")
   .followedBy("end").where(_.getName == "critical")
-  .within(Time.seconds(10))
+  .within(Duration.ofSeconds(10))
 
 val patternStream = CEP.pattern(partitionedInput, pattern)
 
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 9e9f423..78fe317 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -18,21 +18,15 @@
 
 package org.apache.flink.cep;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.cep.functions.PatternProcessFunction;
 import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.types.Either;
 import org.apache.flink.util.OutputTag;
 
-import java.util.UUID;
-
 import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromFlatSelect;
 import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromSelect;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -275,70 +269,6 @@
     }
 
     /**
-     * Applies a select function to the detected pattern sequence. For each pattern sequence the
-     * provided {@link PatternSelectFunction} is called. The pattern select function can produce
-     * exactly one resulting element.
-     *
-     * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each
-     * partial pattern sequence the provided {@link PatternTimeoutFunction} is called. The pattern
-     * timeout function can produce exactly one resulting element.
-     *
-     * @param patternTimeoutFunction The pattern timeout function which is called for each partial
-     *     pattern sequence which has timed out.
-     * @param patternSelectFunction The pattern select function which is called for each detected
-     *     pattern sequence.
-     * @param <L> Type of the resulting timeout elements
-     * @param <R> Type of the resulting elements
-     * @deprecated Use {@link PatternStream#select(OutputTag, PatternTimeoutFunction,
-     *     PatternSelectFunction)} that returns timed out events as a side-output
-     * @return {@link DataStream} which contains the resulting elements or the resulting timeout
-     *     elements wrapped in an {@link Either} type.
-     */
-    @Deprecated
-    public <L, R> SingleOutputStreamOperator<Either<L, R>> select(
-            final PatternTimeoutFunction<T, L> patternTimeoutFunction,
-            final PatternSelectFunction<T, R> patternSelectFunction) {
-
-        final TypeInformation<R> mainTypeInfo =
-                TypeExtractor.getUnaryOperatorReturnType(
-                        patternSelectFunction,
-                        PatternSelectFunction.class,
-                        0,
-                        1,
-                        TypeExtractor.NO_INDEX,
-                        builder.getInputType(),
-                        null,
-                        false);
-
-        final TypeInformation<L> timeoutTypeInfo =
-                TypeExtractor.getUnaryOperatorReturnType(
-                        patternTimeoutFunction,
-                        PatternTimeoutFunction.class,
-                        0,
-                        1,
-                        TypeExtractor.NO_INDEX,
-                        builder.getInputType(),
-                        null,
-                        false);
-
-        final TypeInformation<Either<L, R>> outTypeInfo =
-                new EitherTypeInfo<>(timeoutTypeInfo, mainTypeInfo);
-
-        final OutputTag<L> outputTag =
-                new OutputTag<>(UUID.randomUUID().toString(), timeoutTypeInfo);
-
-        final PatternProcessFunction<T, R> processFunction =
-                fromSelect(builder.clean(patternSelectFunction))
-                        .withTimeoutHandler(outputTag, builder.clean(patternTimeoutFunction))
-                        .build();
-
-        final SingleOutputStreamOperator<R> mainStream = process(processFunction, mainTypeInfo);
-        final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
-
-        return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo);
-    }
-
-    /**
      * Applies a flat select function to the detected pattern sequence. For each pattern sequence
      * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
      * can produce an arbitrary number of resulting elements.
@@ -479,85 +409,4 @@
 
         return process(processFunction, outTypeInfo);
     }
-
-    /**
-     * Applies a flat select function to the detected pattern sequence. For each pattern sequence
-     * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
-     * can produce an arbitrary number of resulting elements.
-     *
-     * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each
-     * partial pattern sequence the provided {@link PatternFlatTimeoutFunction} is called. The
-     * pattern timeout function can produce an arbitrary number of resulting elements.
-     *
-     * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each
-     *     partial pattern sequence which has timed out.
-     * @param patternFlatSelectFunction The pattern flat select function which is called for each
-     *     detected pattern sequence.
-     * @param <L> Type of the resulting timeout events
-     * @param <R> Type of the resulting events
-     * @deprecated Use {@link PatternStream#flatSelect(OutputTag, PatternFlatTimeoutFunction,
-     *     PatternFlatSelectFunction)} that returns timed out events as a side-output
-     * @return {@link DataStream} which contains the resulting events from the pattern flat select
-     *     function or the resulting timeout events from the pattern flat timeout function wrapped
-     *     in an {@link Either} type.
-     */
-    @Deprecated
-    public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect(
-            final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
-            final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
-
-        final TypeInformation<L> timedOutTypeInfo =
-                TypeExtractor.getUnaryOperatorReturnType(
-                        patternFlatTimeoutFunction,
-                        PatternFlatTimeoutFunction.class,
-                        0,
-                        1,
-                        new int[] {2, 0},
-                        builder.getInputType(),
-                        null,
-                        false);
-
-        final TypeInformation<R> mainTypeInfo =
-                TypeExtractor.getUnaryOperatorReturnType(
-                        patternFlatSelectFunction,
-                        PatternFlatSelectFunction.class,
-                        0,
-                        1,
-                        new int[] {1, 0},
-                        builder.getInputType(),
-                        null,
-                        false);
-
-        final OutputTag<L> outputTag =
-                new OutputTag<>(UUID.randomUUID().toString(), timedOutTypeInfo);
-
-        final PatternProcessFunction<T, R> processFunction =
-                fromFlatSelect(builder.clean(patternFlatSelectFunction))
-                        .withTimeoutHandler(outputTag, builder.clean(patternFlatTimeoutFunction))
-                        .build();
-
-        final SingleOutputStreamOperator<R> mainStream = process(processFunction, mainTypeInfo);
-        final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
-        final TypeInformation<Either<L, R>> outTypeInfo =
-                new EitherTypeInfo<>(timedOutTypeInfo, mainTypeInfo);
-
-        return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo);
-    }
-
-    /** Used for joining results from timeout side-output for API backward compatibility. */
-    @Internal
-    public static class CoMapTimeout<R, L> implements CoMapFunction<R, L, Either<L, R>> {
-
-        private static final long serialVersionUID = 2059391566945212552L;
-
-        @Override
-        public Either<L, R> map1(R value) {
-            return Either.Right(value);
-        }
-
-        @Override
-        public Either<L, R> map2(L value) {
-            return Either.Left(value);
-        }
-    }
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java
deleted file mode 100644
index 4a6134d..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.nfa;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.EnumSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.cep.nfa.sharedbuffer.EventId;
-import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedList;
-import java.util.Queue;
-
-/** Methods for deserialization of old format NFA. */
-class MigrationUtils {
-
-    /**
-     * Skips bytes corresponding to serialized states. In flink 1.6+ the states are no longer kept
-     * in state.
-     */
-    static void skipSerializedStates(DataInputView in) throws IOException {
-        TypeSerializer<String> nameSerializer = StringSerializer.INSTANCE;
-        TypeSerializer<State.StateType> stateTypeSerializer =
-                new EnumSerializer<>(State.StateType.class);
-        TypeSerializer<StateTransitionAction> actionSerializer =
-                new EnumSerializer<>(StateTransitionAction.class);
-
-        final int noOfStates = in.readInt();
-
-        for (int i = 0; i < noOfStates; i++) {
-            nameSerializer.deserialize(in);
-            stateTypeSerializer.deserialize(in);
-        }
-
-        for (int i = 0; i < noOfStates; i++) {
-            String srcName = nameSerializer.deserialize(in);
-
-            int noOfTransitions = in.readInt();
-            for (int j = 0; j < noOfTransitions; j++) {
-                String src = nameSerializer.deserialize(in);
-                Preconditions.checkState(
-                        src.equals(srcName),
-                        "Source Edge names do not match (" + srcName + " - " + src + ").");
-
-                nameSerializer.deserialize(in);
-                actionSerializer.deserialize(in);
-
-                try {
-                    skipCondition(in);
-                } catch (ClassNotFoundException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    private static void skipCondition(DataInputView in) throws IOException, ClassNotFoundException {
-        boolean hasCondition = in.readBoolean();
-        if (hasCondition) {
-            int length = in.readInt();
-
-            byte[] serCondition = new byte[length];
-            in.read(serCondition);
-
-            ByteArrayInputStream bais = new ByteArrayInputStream(serCondition);
-            ObjectInputStream ois = new ObjectInputStream(bais);
-
-            ois.readObject();
-            ois.close();
-            bais.close();
-        }
-    }
-
-    static <T> Queue<ComputationState> deserializeComputationStates(
-            org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer,
-            TypeSerializer<T> eventSerializer,
-            DataInputView source)
-            throws IOException {
-
-        Queue<ComputationState> computationStates = new LinkedList<>();
-        StringSerializer stateNameSerializer = StringSerializer.INSTANCE;
-        LongSerializer timestampSerializer = LongSerializer.INSTANCE;
-        DeweyNumber.DeweyNumberSerializer versionSerializer =
-                DeweyNumber.DeweyNumberSerializer.INSTANCE;
-
-        int computationStateNo = source.readInt();
-        for (int i = 0; i < computationStateNo; i++) {
-            String state = stateNameSerializer.deserialize(source);
-            String prevState = stateNameSerializer.deserialize(source);
-            long timestamp = timestampSerializer.deserialize(source);
-            DeweyNumber version = versionSerializer.deserialize(source);
-            long startTimestamp = timestampSerializer.deserialize(source);
-            int counter = source.readInt();
-
-            T event = null;
-            if (source.readBoolean()) {
-                event = eventSerializer.deserialize(source);
-            }
-
-            NodeId nodeId;
-            EventId startEventId;
-            if (prevState != null) {
-                nodeId = sharedBuffer.getNodeId(prevState, timestamp, counter, event);
-                startEventId = sharedBuffer.getStartEventId(version.getRun());
-            } else {
-                nodeId = null;
-                startEventId = null;
-            }
-
-            computationStates.add(
-                    ComputationState.createState(
-                            state, nodeId, version, startTimestamp, -1L, startEventId));
-        }
-        return computationStates;
-    }
-
-    private MigrationUtils() {}
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 841d295..feef013 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -22,10 +22,6 @@
 import org.apache.flink.api.common.functions.DefaultOpenContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
@@ -36,14 +32,11 @@
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.time.TimerService;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -56,8 +49,6 @@
 import java.util.Queue;
 import java.util.Stack;
 
-import static org.apache.flink.cep.nfa.MigrationUtils.deserializeComputationStates;
-
 /**
  * Non-deterministic finite automaton implementation.
  *
@@ -99,7 +90,7 @@
 
     /**
      * The length of a windowed pattern, as specified using the {@link
-     * org.apache.flink.cep.pattern.Pattern#within(Time) Pattern.within(Time)} method.
+     * org.apache.flink.cep.pattern.Pattern#within(Duration) Pattern.within(Duration)} method.
      */
     private final long windowTime;
 
@@ -952,173 +943,4 @@
             return timerService.currentProcessingTime();
         }
     }
-
-    ////////////////////				DEPRECATED/MIGRATION UTILS
-
-    /** Wrapper for migrated state. */
-    public static class MigratedNFA<T> {
-
-        private final Queue<ComputationState> computationStates;
-        private final org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer;
-
-        public org.apache.flink.cep.nfa.SharedBuffer<T> getSharedBuffer() {
-            return sharedBuffer;
-        }
-
-        public Queue<ComputationState> getComputationStates() {
-            return computationStates;
-        }
-
-        MigratedNFA(
-                final Queue<ComputationState> computationStates,
-                final org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer) {
-            this.sharedBuffer = sharedBuffer;
-            this.computationStates = computationStates;
-        }
-    }
-
-    /** A {@link TypeSerializerSnapshot} for the legacy {@link NFASerializer}. */
-    @SuppressWarnings("deprecation")
-    public static final class MigratedNFASerializerSnapshot<T>
-            extends CompositeTypeSerializerSnapshot<MigratedNFA<T>, NFASerializer<T>> {
-
-        private static final int VERSION = 2;
-
-        public MigratedNFASerializerSnapshot() {}
-
-        MigratedNFASerializerSnapshot(NFASerializer<T> legacyNfaSerializer) {
-            super(legacyNfaSerializer);
-        }
-
-        @Override
-        protected int getCurrentOuterSnapshotVersion() {
-            return VERSION;
-        }
-
-        @Override
-        protected TypeSerializer<?>[] getNestedSerializers(NFASerializer<T> outerSerializer) {
-            return new TypeSerializer<?>[] {
-                outerSerializer.eventSerializer, outerSerializer.sharedBufferSerializer
-            };
-        }
-
-        @Override
-        protected NFASerializer<T> createOuterSerializerWithNestedSerializers(
-                TypeSerializer<?>[] nestedSerializers) {
-            @SuppressWarnings("unchecked")
-            TypeSerializer<T> eventSerializer = (TypeSerializer<T>) nestedSerializers[0];
-
-            @SuppressWarnings("unchecked")
-            TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer =
-                    (TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>>) nestedSerializers[1];
-
-            return new NFASerializer<>(eventSerializer, sharedBufferSerializer);
-        }
-    }
-
-    /** Only for backward compatibility with <=1.5. */
-    @Deprecated
-    public static class NFASerializer<T> extends TypeSerializer<MigratedNFA<T>> {
-
-        private static final long serialVersionUID = 2098282423980597010L;
-
-        private final TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>>
-                sharedBufferSerializer;
-
-        private final TypeSerializer<T> eventSerializer;
-
-        public NFASerializer(TypeSerializer<T> typeSerializer) {
-            this(
-                    typeSerializer,
-                    new org.apache.flink.cep.nfa.SharedBuffer.SharedBufferSerializer<>(
-                            StringSerializer.INSTANCE, typeSerializer));
-        }
-
-        NFASerializer(
-                TypeSerializer<T> typeSerializer,
-                TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer) {
-            this.eventSerializer = typeSerializer;
-            this.sharedBufferSerializer = sharedBufferSerializer;
-        }
-
-        @Override
-        public boolean isImmutableType() {
-            return false;
-        }
-
-        @Override
-        public NFASerializer<T> duplicate() {
-            return new NFASerializer<>(eventSerializer.duplicate());
-        }
-
-        @Override
-        public MigratedNFA<T> createInstance() {
-            return null;
-        }
-
-        @Override
-        public MigratedNFA<T> copy(MigratedNFA<T> from) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public MigratedNFA<T> copy(MigratedNFA<T> from, MigratedNFA<T> reuse) {
-            return copy(from);
-        }
-
-        @Override
-        public int getLength() {
-            return -1;
-        }
-
-        @Override
-        public void serialize(MigratedNFA<T> record, DataOutputView target) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public MigratedNFA<T> deserialize(DataInputView source) throws IOException {
-            MigrationUtils.skipSerializedStates(source);
-            source.readLong();
-            source.readBoolean();
-
-            org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer =
-                    sharedBufferSerializer.deserialize(source);
-            Queue<ComputationState> computationStates =
-                    deserializeComputationStates(sharedBuffer, eventSerializer, source);
-
-            return new MigratedNFA<>(computationStates, sharedBuffer);
-        }
-
-        @Override
-        public MigratedNFA<T> deserialize(MigratedNFA<T> reuse, DataInputView source)
-                throws IOException {
-            return deserialize(source);
-        }
-
-        @Override
-        public void copy(DataInputView source, DataOutputView target) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            return obj == this
-                    || (obj != null
-                            && obj.getClass().equals(getClass())
-                            && sharedBufferSerializer.equals(
-                                    ((NFASerializer) obj).sharedBufferSerializer)
-                            && eventSerializer.equals(((NFASerializer) obj).eventSerializer));
-        }
-
-        @Override
-        public int hashCode() {
-            return 37 * sharedBufferSerializer.hashCode() + eventSerializer.hashCode();
-        }
-
-        @Override
-        public MigratedNFASerializerSnapshot<T> snapshotConfiguration() {
-            return new MigratedNFASerializerSnapshot<>(this);
-        }
-    }
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
deleted file mode 100644
index 9285a51..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.nfa;
-
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
-import org.apache.flink.cep.nfa.sharedbuffer.EventId;
-import org.apache.flink.cep.nfa.sharedbuffer.Lockable;
-import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNode;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * @deprecated everything in this class is deprecated. Those are only migration procedures from
- *     older versions.
- */
-@Deprecated
-public class SharedBuffer<V> {
-
-    private final Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext;
-    /** Run number (first block in DeweyNumber) -> EventId. */
-    private Map<Integer, EventId> starters;
-
-    private final Map<EventId, Lockable<V>> eventsBuffer;
-    private final Map<NodeId, Lockable<SharedBufferNode>> pages;
-
-    public Map<EventId, Lockable<V>> getEventsBuffer() {
-        return eventsBuffer;
-    }
-
-    public Map<NodeId, Lockable<SharedBufferNode>> getPages() {
-        return pages;
-    }
-
-    public SharedBuffer(
-            Map<EventId, Lockable<V>> eventsBuffer,
-            Map<NodeId, Lockable<SharedBufferNode>> pages,
-            Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext,
-            Map<Integer, EventId> starters) {
-
-        this.eventsBuffer = eventsBuffer;
-        this.pages = pages;
-        this.mappingContext = mappingContext;
-        this.starters = starters;
-    }
-
-    public NodeId getNodeId(String prevState, long timestamp, int counter, V event) {
-        return mappingContext.get(
-                Tuple2.of(
-                        NFAStateNameHandler.getOriginalNameFromInternal(prevState),
-                        new ValueTimeWrapper<>(event, timestamp, counter)));
-    }
-
-    public EventId getStartEventId(int run) {
-        return starters.get(run);
-    }
-
-    /**
-     * Wrapper for a value-timestamp pair.
-     *
-     * @param <V> Type of the value
-     */
-    private static class ValueTimeWrapper<V> {
-
-        private final V value;
-        private final long timestamp;
-        private final int counter;
-
-        ValueTimeWrapper(final V value, final long timestamp, final int counter) {
-            this.value = value;
-            this.timestamp = timestamp;
-            this.counter = counter;
-        }
-
-        /**
-         * Returns a counter used to disambiguate between different accepted elements with the same
-         * value and timestamp that refer to the same looping state.
-         */
-        public int getCounter() {
-            return counter;
-        }
-
-        public V getValue() {
-            return value;
-        }
-
-        public long getTimestamp() {
-            return timestamp;
-        }
-
-        @Override
-        public String toString() {
-            return "ValueTimeWrapper(" + value + ", " + timestamp + ", " + counter + ")";
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (!(obj instanceof ValueTimeWrapper)) {
-                return false;
-            }
-
-            @SuppressWarnings("unchecked")
-            ValueTimeWrapper<V> other = (ValueTimeWrapper<V>) obj;
-
-            return timestamp == other.getTimestamp()
-                    && Objects.equals(value, other.getValue())
-                    && counter == other.getCounter();
-        }
-
-        @Override
-        public int hashCode() {
-            return (int) (31 * (31 * (timestamp ^ timestamp >>> 32) + value.hashCode()) + counter);
-        }
-
-        public static <V> ValueTimeWrapper<V> deserialize(
-                final TypeSerializer<V> valueSerializer, final DataInputView source)
-                throws IOException {
-
-            final V value = valueSerializer.deserialize(source);
-            final long timestamp = source.readLong();
-            final int counter = source.readInt();
-
-            return new ValueTimeWrapper<>(value, timestamp, counter);
-        }
-    }
-
-    /** A {@link TypeSerializerSnapshot} for the {@link SharedBufferSerializerSnapshot}. */
-    public static final class SharedBufferSerializerSnapshot<K, V>
-            extends CompositeTypeSerializerSnapshot<SharedBuffer<V>, SharedBufferSerializer<K, V>> {
-
-        private static final int VERSION = 2;
-
-        public SharedBufferSerializerSnapshot() {}
-
-        public SharedBufferSerializerSnapshot(SharedBufferSerializer<K, V> sharedBufferSerializer) {
-            super(sharedBufferSerializer);
-        }
-
-        @Override
-        protected int getCurrentOuterSnapshotVersion() {
-            return VERSION;
-        }
-
-        @Override
-        protected TypeSerializer<?>[] getNestedSerializers(
-                SharedBufferSerializer<K, V> outerSerializer) {
-            return new TypeSerializer<?>[] {
-                outerSerializer.keySerializer,
-                outerSerializer.valueSerializer,
-                outerSerializer.versionSerializer
-            };
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        protected SharedBufferSerializer<K, V> createOuterSerializerWithNestedSerializers(
-                TypeSerializer<?>[] nestedSerializers) {
-            TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0];
-            TypeSerializer<V> valueSerializer = (TypeSerializer<V>) nestedSerializers[1];
-            TypeSerializer<DeweyNumber> versionSerializer =
-                    (TypeSerializer<DeweyNumber>) nestedSerializers[2];
-            return new SharedBufferSerializer<>(keySerializer, valueSerializer, versionSerializer);
-        }
-    }
-
-    /** A {@link TypeSerializer} for the {@link SharedBuffer}. */
-    public static class SharedBufferSerializer<K, V> extends TypeSerializer<SharedBuffer<V>> {
-
-        private static final long serialVersionUID = -3254176794680331560L;
-
-        private final TypeSerializer<K> keySerializer;
-        private final TypeSerializer<V> valueSerializer;
-        private final TypeSerializer<DeweyNumber> versionSerializer;
-
-        public SharedBufferSerializer(
-                final TypeSerializer<K> keySerializer, final TypeSerializer<V> valueSerializer) {
-            this(keySerializer, valueSerializer, DeweyNumber.DeweyNumberSerializer.INSTANCE);
-        }
-
-        public SharedBufferSerializer(
-                final TypeSerializer<K> keySerializer,
-                final TypeSerializer<V> valueSerializer,
-                final TypeSerializer<DeweyNumber> versionSerializer) {
-
-            this.keySerializer = keySerializer;
-            this.valueSerializer = valueSerializer;
-            this.versionSerializer = versionSerializer;
-        }
-
-        public TypeSerializer<DeweyNumber> getVersionSerializer() {
-            return versionSerializer;
-        }
-
-        public TypeSerializer<K> getKeySerializer() {
-            return keySerializer;
-        }
-
-        public TypeSerializer<V> getValueSerializer() {
-            return valueSerializer;
-        }
-
-        @Override
-        public boolean isImmutableType() {
-            return false;
-        }
-
-        @Override
-        public SharedBufferSerializer<K, V> duplicate() {
-            return new SharedBufferSerializer<>(
-                    keySerializer.duplicate(), valueSerializer.duplicate());
-        }
-
-        @Override
-        public SharedBuffer<V> createInstance() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public SharedBuffer<V> copy(SharedBuffer<V> from) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public SharedBuffer<V> copy(SharedBuffer<V> from, SharedBuffer<V> reuse) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public int getLength() {
-            return -1;
-        }
-
-        @Override
-        public void serialize(SharedBuffer<V> record, DataOutputView target) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public SharedBuffer<V> deserialize(DataInputView source) throws IOException {
-            List<Tuple2<NodeId, Lockable<SharedBufferNode>>> entries = new ArrayList<>();
-            Map<ValueTimeWrapper<V>, EventId> values = new HashMap<>();
-            Map<EventId, Lockable<V>> valuesWithIds = new HashMap<>();
-            Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext = new HashMap<>();
-            Map<Long, Integer> totalEventsPerTimestamp = new HashMap<>();
-            int totalPages = source.readInt();
-
-            for (int i = 0; i < totalPages; i++) {
-                // key of the page
-                K stateName = keySerializer.deserialize(source);
-
-                int numberEntries = source.readInt();
-                for (int j = 0; j < numberEntries; j++) {
-                    ValueTimeWrapper<V> wrapper =
-                            ValueTimeWrapper.deserialize(valueSerializer, source);
-                    EventId eventId = values.get(wrapper);
-                    if (eventId == null) {
-                        int id = totalEventsPerTimestamp.computeIfAbsent(wrapper.timestamp, k -> 0);
-                        eventId = new EventId(id, wrapper.timestamp);
-                        values.put(wrapper, eventId);
-                        valuesWithIds.put(eventId, new Lockable<>(wrapper.value, 1));
-                        totalEventsPerTimestamp.computeIfPresent(
-                                wrapper.timestamp, (k, v) -> v + 1);
-                    } else {
-                        Lockable<V> eventWrapper = valuesWithIds.get(eventId);
-                        eventWrapper.lock();
-                    }
-
-                    NodeId nodeId = new NodeId(eventId, (String) stateName);
-                    int refCount = source.readInt();
-
-                    entries.add(
-                            Tuple2.of(nodeId, new Lockable<>(new SharedBufferNode(), refCount)));
-                    mappingContext.put(Tuple2.of((String) stateName, wrapper), nodeId);
-                }
-            }
-
-            // read the edges of the shared buffer entries
-            int totalEdges = source.readInt();
-
-            Map<Integer, EventId> starters = new HashMap<>();
-            for (int j = 0; j < totalEdges; j++) {
-                int sourceIdx = source.readInt();
-
-                int targetIdx = source.readInt();
-
-                DeweyNumber version = versionSerializer.deserialize(source);
-
-                // We've already deserialized the shared buffer entry. Simply read its ID and
-                // retrieve the buffer entry from the list of entries
-                Tuple2<NodeId, Lockable<SharedBufferNode>> sourceEntry = entries.get(sourceIdx);
-                Tuple2<NodeId, Lockable<SharedBufferNode>> targetEntry =
-                        targetIdx < 0 ? Tuple2.of(null, null) : entries.get(targetIdx);
-                sourceEntry.f1.getElement().addEdge(new SharedBufferEdge(targetEntry.f0, version));
-                if (version.length() == 1) {
-                    starters.put(version.getRun(), sourceEntry.f0.getEventId());
-                }
-            }
-
-            Map<NodeId, Lockable<SharedBufferNode>> entriesMap =
-                    entries.stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1));
-
-            return new SharedBuffer<>(valuesWithIds, entriesMap, mappingContext, starters);
-        }
-
-        @Override
-        public SharedBuffer<V> deserialize(SharedBuffer<V> reuse, DataInputView source)
-                throws IOException {
-            return deserialize(source);
-        }
-
-        @Override
-        public void copy(DataInputView source, DataOutputView target) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (obj == this) {
-                return true;
-            }
-
-            if (obj == null || !Objects.equals(obj.getClass(), getClass())) {
-                return false;
-            }
-
-            SharedBufferSerializer other = (SharedBufferSerializer) obj;
-            return Objects.equals(keySerializer, other.getKeySerializer())
-                    && Objects.equals(valueSerializer, other.getValueSerializer())
-                    && Objects.equals(versionSerializer, other.getVersionSerializer());
-        }
-
-        @Override
-        public int hashCode() {
-            return 37 * keySerializer.hashCode() + valueSerializer.hashCode();
-        }
-
-        @Override
-        public SharedBufferSerializerSnapshot<K, V> snapshotConfiguration() {
-            return new SharedBufferSerializerSnapshot<>(this);
-        }
-    }
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
index 84d61fb..d01c7e7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
@@ -22,16 +22,10 @@
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.cep.configuration.SharedBufferCacheConfig;
-import org.apache.flink.cep.nfa.DeweyNumber;
-import org.apache.flink.cep.nfa.NFAState;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.util.WrappingRuntimeException;
 
 import org.apache.flink.shaded.guava32.com.google.common.cache.Cache;
@@ -73,7 +67,6 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(SharedBuffer.class);
 
-    private static final String LEGACY_ENTRIES_STATE_NAME = "sharedBuffer-entries";
     private static final String ENTRIES_STATE_NAME = "sharedBuffer-entries-with-lockable-edges";
     private static final String EVENTS_STATE_NAME = "sharedBuffer-events";
     private static final String EVENTS_COUNT_STATE_NAME = "sharedBuffer-events-count";
@@ -180,81 +173,6 @@
                 cacheConfig.getCacheStatisticsInterval().toMillis());
     }
 
-    public void migrateOldState(
-            KeyedStateBackend<?> stateBackend, ValueState<NFAState> computationStates)
-            throws Exception {
-        stateBackend.applyToAllKeys(
-                VoidNamespace.INSTANCE,
-                VoidNamespaceSerializer.INSTANCE,
-                new MapStateDescriptor<>(
-                        LEGACY_ENTRIES_STATE_NAME,
-                        new NodeId.NodeIdSerializer(),
-                        new Lockable.LockableTypeSerializer<>(
-                                new SharedBufferNode.SharedBufferNodeSerializer())),
-                (key, state) -> {
-                    copyEntries(state);
-                    state.entries().forEach(this::lockPredecessorEdges);
-                    state.clear();
-
-                    NFAState nfaState = computationStates.value();
-                    nfaState.getPartialMatches()
-                            .forEach(
-                                    computationState ->
-                                            lockEdges(
-                                                    computationState.getPreviousBufferEntry(),
-                                                    computationState.getVersion()));
-                    nfaState.getCompletedMatches()
-                            .forEach(
-                                    computationState ->
-                                            lockEdges(
-                                                    computationState.getPreviousBufferEntry(),
-                                                    computationState.getVersion()));
-                });
-    }
-
-    private void copyEntries(MapState<NodeId, Lockable<SharedBufferNode>> state) throws Exception {
-        state.entries()
-                .forEach(
-                        e -> {
-                            try {
-                                entries.put(e.getKey(), e.getValue());
-                            } catch (Exception exception) {
-                                throw new RuntimeException(exception);
-                            }
-                        });
-    }
-
-    private void lockPredecessorEdges(Map.Entry<NodeId, Lockable<SharedBufferNode>> e) {
-        SharedBufferNode oldNode = e.getValue().getElement();
-        oldNode.getEdges()
-                .forEach(
-                        edge -> {
-                            SharedBufferEdge oldEdge = edge.getElement();
-                            lockEdges(oldEdge.getTarget(), oldEdge.getDeweyNumber());
-                        });
-    }
-
-    private void lockEdges(NodeId nodeId, DeweyNumber version) {
-
-        if (nodeId == null) {
-            return;
-        }
-
-        try {
-            SharedBufferNode newNode = entries.get(nodeId).getElement();
-            newNode.getEdges()
-                    .forEach(
-                            newEdge -> {
-                                if (version.isCompatibleWith(
-                                        newEdge.getElement().getDeweyNumber())) {
-                                    newEdge.lock();
-                                }
-                            });
-        } catch (Exception exception) {
-            throw new RuntimeException(exception);
-        }
-    }
-
     /**
      * Construct an accessor to deal with this sharedBuffer.
      *
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
index 8339496..e32569b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
@@ -18,24 +18,10 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge.SharedBufferEdgeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /** An entry in {@link SharedBuffer} that allows to store relations between different entries. */
 public class SharedBufferNode {
 
@@ -78,124 +64,4 @@
     public int hashCode() {
         return Objects.hash(edges);
     }
-
-    /**
-     * Serializer for {@link SharedBufferNode}.
-     *
-     * <p>This serializer had to be deprecated and you cannot directly migrate to the newer version.
-     * The new structure requires additional information from other nodes. The migration happens in
-     * {@link SharedBuffer#migrateOldState(KeyedStateBackend, ValueState)}.
-     *
-     * @deprecated was used in <= 1.12, use {@link
-     *     org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNodeSerializer} instead.
-     */
-    @Deprecated
-    public static class SharedBufferNodeSerializer
-            extends TypeSerializerSingleton<SharedBufferNode> {
-
-        private static final long serialVersionUID = -6687780732295439832L;
-
-        private final ListSerializer<SharedBufferEdge> edgesSerializer;
-
-        public SharedBufferNodeSerializer() {
-            this.edgesSerializer = new ListSerializer<>(new SharedBufferEdgeSerializer());
-        }
-
-        private SharedBufferNodeSerializer(ListSerializer<SharedBufferEdge> edgesSerializer) {
-            this.edgesSerializer = checkNotNull(edgesSerializer);
-        }
-
-        @Override
-        public boolean isImmutableType() {
-            return false;
-        }
-
-        @Override
-        public SharedBufferNode createInstance() {
-            return new SharedBufferNode(new ArrayList<>());
-        }
-
-        @Override
-        public SharedBufferNode copy(SharedBufferNode from) {
-            throw new UnsupportedOperationException("Should not be used");
-        }
-
-        @Override
-        public SharedBufferNode copy(SharedBufferNode from, SharedBufferNode reuse) {
-            return copy(from);
-        }
-
-        @Override
-        public int getLength() {
-            return -1;
-        }
-
-        @Override
-        public void serialize(SharedBufferNode record, DataOutputView target) throws IOException {
-            throw new UnsupportedOperationException("We should no longer use it for serialization");
-        }
-
-        @Override
-        public SharedBufferNode deserialize(DataInputView source) throws IOException {
-            List<SharedBufferEdge> edges = edgesSerializer.deserialize(source);
-            SharedBufferNode node = new SharedBufferNode();
-            for (SharedBufferEdge edge : edges) {
-                node.addEdge(edge);
-            }
-            return node;
-        }
-
-        @Override
-        public SharedBufferNode deserialize(SharedBufferNode reuse, DataInputView source)
-                throws IOException {
-            return deserialize(source);
-        }
-
-        @Override
-        public void copy(DataInputView source, DataOutputView target) throws IOException {
-            edgesSerializer.copy(source, target);
-        }
-
-        // -----------------------------------------------------------------------------------
-
-        @Override
-        public TypeSerializerSnapshot<SharedBufferNode> snapshotConfiguration() {
-            return new SharedBufferNodeSerializerSnapshot(this);
-        }
-
-        /** Serializer configuration snapshot for compatibility and format evolution. */
-        @SuppressWarnings("WeakerAccess")
-        public static final class SharedBufferNodeSerializerSnapshot
-                extends CompositeTypeSerializerSnapshot<
-                        SharedBufferNode, SharedBufferNodeSerializer> {
-
-            private static final int VERSION = 1;
-
-            public SharedBufferNodeSerializerSnapshot() {}
-
-            public SharedBufferNodeSerializerSnapshot(
-                    SharedBufferNodeSerializer sharedBufferNodeSerializer) {
-                super(sharedBufferNodeSerializer);
-            }
-
-            @Override
-            protected int getCurrentOuterSnapshotVersion() {
-                return VERSION;
-            }
-
-            @Override
-            @SuppressWarnings("unchecked")
-            protected SharedBufferNodeSerializer createOuterSerializerWithNestedSerializers(
-                    TypeSerializer<?>[] nestedSerializers) {
-                return new SharedBufferNodeSerializer(
-                        (ListSerializer<SharedBufferEdge>) nestedSerializers[0]);
-            }
-
-            @Override
-            protected TypeSerializer<?>[] getNestedSerializers(
-                    SharedBufferNodeSerializer outerSerializer) {
-                return new TypeSerializer<?>[] {outerSerializer.edgesSerializer};
-            }
-        }
-    }
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index b2f4020..49b0849 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -199,10 +199,6 @@
                                         EVENT_QUEUE_STATE_NAME,
                                         LongSerializer.INSTANCE,
                                         new ListSerializer<>(inputSerializer)));
-
-        if (context.isRestored()) {
-            partialMatches.migrateOldState(getKeyedStateBackend(), computationStates);
-        }
     }
 
     @Override
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 0b41224..fdbf8e5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -29,7 +29,6 @@
 import org.apache.flink.cep.pattern.conditions.RichAndCondition;
 import org.apache.flink.cep.pattern.conditions.RichOrCondition;
 import org.apache.flink.cep.pattern.conditions.SubtypeCondition;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -104,24 +103,10 @@
         return name;
     }
 
-    /** @deprecated Use {@link #getWindowSize()} */
-    @Deprecated
-    @Nullable
-    public Time getWindowTime() {
-        return getWindowSize().map(Time::of).orElse(null);
-    }
-
     public Optional<Duration> getWindowSize() {
         return getWindowSize(WithinType.FIRST_AND_LAST);
     }
 
-    /** @deprecated Use {@link #getWindowSize(WithinType)}. */
-    @Deprecated
-    @Nullable
-    public Time getWindowTime(WithinType withinType) {
-        return getWindowSize(withinType).map(Time::of).orElse(null);
-    }
-
     public Optional<Duration> getWindowSize(WithinType withinType) {
         return Optional.ofNullable(windowTimes.get(withinType));
     }
@@ -266,20 +251,6 @@
      *
      * @param windowTime Time of the matching window
      * @return The same pattern operator with the new window length
-     * @deprecated Use {@link #within(Duration)}.
-     */
-    @Deprecated
-    public Pattern<T, F> within(@Nullable Time windowTime) {
-        return within(Time.toDuration(windowTime));
-    }
-
-    /**
-     * Defines the maximum time interval in which a matching pattern has to be completed in order to
-     * be considered valid. This interval corresponds to the maximum time gap between first and the
-     * last event.
-     *
-     * @param windowTime Time of the matching window
-     * @return The same pattern operator with the new window length
      */
     public Pattern<T, F> within(@Nullable Duration windowTime) {
         return within(windowTime, WithinType.FIRST_AND_LAST);
@@ -292,20 +263,6 @@
      * @param withinType Type of the within interval between events
      * @param windowTime Time of the matching window
      * @return The same pattern operator with the new window length
-     * @deprecated Use {@link #within(Duration, WithinType)}.
-     */
-    @Deprecated
-    public Pattern<T, F> within(@Nullable Time windowTime, WithinType withinType) {
-        return within(Time.toDuration(windowTime), withinType);
-    }
-
-    /**
-     * Defines the maximum time interval in which a matching pattern has to be completed in order to
-     * be considered valid. This interval corresponds to the maximum time gap between events.
-     *
-     * @param withinType Type of the within interval between events
-     * @param windowTime Time of the matching window
-     * @return The same pattern operator with the new window length
      */
     public Pattern<T, F> within(@Nullable Duration windowTime, WithinType withinType) {
         if (windowTime != null) {
@@ -429,26 +386,6 @@
      * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier
      *     applied.
      * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
-     * @deprecated Use {@link #oneOrMore(Duration)}
-     */
-    @Deprecated
-    public Pattern<T, F> oneOrMore(@Nullable Time windowTime) {
-        return oneOrMore(Time.toDuration(windowTime));
-    }
-
-    /**
-     * Specifies that this pattern can occur {@code one or more} times and time interval corresponds
-     * to the maximum time gap between previous and current event for each times. This means at
-     * least one and at most infinite number of events can be matched to this pattern.
-     *
-     * <p>If this quantifier is enabled for a pattern {@code A.oneOrMore().followedBy(B)} and a
-     * sequence of events {@code A1 A2 B} appears, this will generate patterns: {@code A1 B} and
-     * {@code A1 A2 B}. See also {@link #allowCombinations()}.
-     *
-     * @param windowTime time of the matching window between times
-     * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier
-     *     applied.
-     * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
      */
     public Pattern<T, F> oneOrMore(@Nullable Duration windowTime) {
         checkIfNoNotPattern();
@@ -491,21 +428,6 @@
      * @param windowTime time of the matching window between times
      * @return The same pattern with number of times applied
      * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
-     * @deprecated Using {@link #times(int, Duration)}
-     */
-    @Deprecated
-    public Pattern<T, F> times(int times, @Nullable Time windowTime) {
-        return times(times, Time.toDuration(windowTime));
-    }
-
-    /**
-     * Specifies exact number of times that this pattern should be matched and time interval
-     * corresponds to the maximum time gap between previous and current event for each times.
-     *
-     * @param times number of times matching event must appear
-     * @param windowTime time of the matching window between times
-     * @return The same pattern with number of times applied
-     * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
      */
     public Pattern<T, F> times(int times, @Nullable Duration windowTime) {
         checkIfNoNotPattern();
@@ -537,22 +459,6 @@
      * @param windowTime time of the matching window between times
      * @return The same pattern with the number of times range applied
      * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
-     * @deprecated Use {@link #times(int, int, Duration)}
-     */
-    @Deprecated
-    public Pattern<T, F> times(int from, int to, @Nullable Time windowTime) {
-        return times(from, to, Time.toDuration(windowTime));
-    }
-
-    /**
-     * Specifies that the pattern can occur between from and to times with time interval corresponds
-     * to the maximum time gap between previous and current event for each times.
-     *
-     * @param from number of times matching event must appear at least
-     * @param to number of times matching event must appear at most
-     * @param windowTime time of the matching window between times
-     * @return The same pattern with the number of times range applied
-     * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
      */
     public Pattern<T, F> times(int from, int to, @Nullable Duration windowTime) {
         checkIfNoNotPattern();
@@ -589,24 +495,6 @@
      * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier
      *     applied.
      * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
-     * @deprecated Use {@link #timesOrMore(int, Duration)}
-     */
-    @Deprecated
-    public Pattern<T, F> timesOrMore(int times, @Nullable Time windowTime) {
-        return timesOrMore(times, Time.toDuration(windowTime));
-    }
-
-    /**
-     * Specifies that this pattern can occur the specified times at least with interval corresponds
-     * to the maximum time gap between previous and current event for each times. This means at
-     * least the specified times and at most infinite number of events can be matched to this
-     * pattern.
-     *
-     * @param times number of times at least matching event must appear
-     * @param windowTime time of the matching window between times
-     * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier
-     *     applied.
-     * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
      */
     public Pattern<T, F> timesOrMore(int times, @Nullable Duration windowTime) {
         checkIfNoNotPattern();
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 2e5efd4..721b8a4 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.pattern;
 
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -213,32 +212,14 @@
             return to;
         }
 
-        /** @deprecated Use {@link #getWindowSize()}. */
-        @Deprecated
-        public Time getWindowTime() {
-            return getWindowSize().map(Time::of).orElse(null);
-        }
-
         public Optional<Duration> getWindowSize() {
             return Optional.ofNullable(windowTime);
         }
 
-        /** @deprecated Use {@link #of(int, int, Duration)} */
-        @Deprecated
-        public static Times of(int from, int to, @Nullable Time windowTime) {
-            return of(from, to, Time.toDuration(windowTime));
-        }
-
         public static Times of(int from, int to, @Nullable Duration windowTime) {
             return new Times(from, to, windowTime);
         }
 
-        /** @deprecated Use {@link #of(int, Duration)} */
-        @Deprecated
-        public static Times of(int times, @Nullable Time windowTime) {
-            return of(times, Time.toDuration(windowTime));
-        }
-
         public static Times of(int times, @Nullable Duration windowTime) {
             return new Times(times, times, windowTime);
         }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
deleted file mode 100644
index 09ca672..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.pattern.conditions;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-/**
- * A {@link IterativeCondition condition} which combines two conditions with a logical {@code AND}
- * and returns {@code true} if both are {@code true}.
- *
- * @param <T> Type of the element to filter
- * @deprecated Please use {@link RichAndCondition} instead. This class exists just for backwards
- *     compatibility and will be removed in FLINK-10113.
- */
-@Internal
-@Deprecated
-public class AndCondition<T> extends IterativeCondition<T> {
-
-    private static final long serialVersionUID = -2471892317390197319L;
-
-    private final IterativeCondition<T> left;
-    private final IterativeCondition<T> right;
-
-    public AndCondition(final IterativeCondition<T> left, final IterativeCondition<T> right) {
-        this.left = Preconditions.checkNotNull(left, "The condition cannot be null.");
-        this.right = Preconditions.checkNotNull(right, "The condition cannot be null.");
-    }
-
-    @Override
-    public boolean filter(T value, Context<T> ctx) throws Exception {
-        return left.filter(value, ctx) && right.filter(value, ctx);
-    }
-
-    /** @return One of the {@link IterativeCondition conditions} combined in this condition. */
-    public IterativeCondition<T> getLeft() {
-        return left;
-    }
-
-    /** @return One of the {@link IterativeCondition conditions} combined in this condition. */
-    public IterativeCondition<T> getRight() {
-        return right;
-    }
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
deleted file mode 100644
index 401b08f..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.pattern.conditions;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * A {@link IterativeCondition condition} which negates the condition it wraps and returns {@code
- * true} if the original condition returns {@code false}.
- *
- * @param <T> Type of the element to filter
- * @deprecated Please use {@link RichNotCondition} instead. This class exists just for backwards
- *     compatibility and will be removed in FLINK-10113.
- */
-@Internal
-@Deprecated
-public class NotCondition<T> extends IterativeCondition<T> {
-    private static final long serialVersionUID = -2109562093871155005L;
-
-    private final IterativeCondition<T> original;
-
-    public NotCondition(final IterativeCondition<T> original) {
-        this.original = original;
-    }
-
-    @Override
-    public boolean filter(T value, Context<T> ctx) throws Exception {
-        return original != null && !original.filter(value, ctx);
-    }
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
deleted file mode 100644
index 4c2c3b0..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.pattern.conditions;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-/**
- * A {@link IterativeCondition condition} which combines two conditions with a logical {@code OR}
- * and returns {@code true} if at least one is {@code true}.
- *
- * @param <T> Type of the element to filter
- * @deprecated Please use {@link RichOrCondition} instead. This class exists just for backwards
- *     compatibility and will be removed in FLINK-10113.
- */
-@Internal
-@Deprecated
-public class OrCondition<T> extends IterativeCondition<T> {
-
-    private static final long serialVersionUID = 2554610954278485106L;
-
-    private final IterativeCondition<T> left;
-    private final IterativeCondition<T> right;
-
-    public OrCondition(final IterativeCondition<T> left, final IterativeCondition<T> right) {
-        this.left = Preconditions.checkNotNull(left, "The condition cannot be null.");
-        this.right = Preconditions.checkNotNull(right, "The condition cannot be null.");
-    }
-
-    @Override
-    public boolean filter(T value, Context<T> ctx) throws Exception {
-        return left.filter(value, ctx) || right.filter(value, ctx);
-    }
-
-    /** @return One of the {@link IterativeCondition conditions} combined in this condition. */
-    public IterativeCondition<T> getLeft() {
-        return left;
-    }
-
-    /** @return One of the {@link IterativeCondition conditions} combined in this condition. */
-    public IterativeCondition<T> getRight() {
-        return right;
-    }
-}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 59a47a3..86e2afa 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -39,12 +39,11 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.test.util.AbstractTestBaseJUnit4;
-import org.apache.flink.types.Either;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
@@ -423,7 +422,7 @@
         Pattern<Integer, ?> pattern =
                 Pattern.<Integer>begin("start")
                         .followedByAny("end")
-                        .within(Time.days(1), withinType);
+                        .within(Duration.ofDays(1), withinType);
 
         DataStream<Integer> result =
                 CEP.pattern(input, pattern)
@@ -490,11 +489,13 @@
                         .where(SimpleCondition.of(value -> value.getName().equals("middle")))
                         .followedByAny("end")
                         .where(SimpleCondition.of(value -> value.getName().equals("end")))
-                        .within(Time.milliseconds(3));
+                        .within(Duration.ofMillis(3));
 
-        DataStream<Either<String, String>> result =
+        OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
+        SingleOutputStreamOperator<String> result =
                 CEP.pattern(input, pattern)
                         .select(
+                                outputTag,
                                 new PatternTimeoutFunction<Event, String>() {
                                     @Override
                                     public String timeout(
@@ -519,20 +520,21 @@
                                     }
                                 });
 
-        List<Either<String, String>> resultList = new ArrayList<>();
+        List<String> resultList = new ArrayList<>();
 
         result.executeAndCollect().forEachRemaining(resultList::add);
 
         resultList.sort(Comparator.comparing(Object::toString));
 
-        List<Either<String, String>> expected =
-                Arrays.asList(
-                        Either.Left.of("1.0"),
-                        Either.Left.of("2.0"),
-                        Either.Left.of("2.0"),
-                        Either.Right.of("2.0,2.0,2.0"));
+        List<String> timeoutList = new ArrayList<>();
+        result.getSideOutput(outputTag).executeAndCollect().forEachRemaining(timeoutList::add);
+        timeoutList.sort(Comparator.comparing(Object::toString));
 
-        assertEquals(expected, resultList);
+        List<String> timeoutExpected = Arrays.asList("1.0", "2.0", "2.0");
+        List<String> resultExpected = Arrays.asList("2.0,2.0,2.0");
+
+        assertEquals(timeoutExpected, timeoutList);
+        assertEquals(resultExpected, resultList);
     }
 
     @Test
@@ -580,11 +582,14 @@
                         .where(SimpleCondition.of(value -> value.getName().equals("middle")))
                         .followedByAny("end")
                         .where(SimpleCondition.of(value -> value.getName().equals("end")))
-                        .within(Time.milliseconds(3), WithinType.PREVIOUS_AND_CURRENT);
+                        .within(Duration.ofMillis(3), WithinType.PREVIOUS_AND_CURRENT);
 
-        DataStream<Either<String, String>> result =
+        OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
+
+        SingleOutputStreamOperator<String> result =
                 CEP.pattern(input, pattern)
                         .select(
+                                outputTag,
                                 new PatternTimeoutFunction<Event, String>() {
                                     @Override
                                     public String timeout(
@@ -609,20 +614,21 @@
                                     }
                                 });
 
-        List<Either<String, String>> resultList = new ArrayList<>();
+        List<String> resultList = new ArrayList<>();
 
         result.executeAndCollect().forEachRemaining(resultList::add);
 
         resultList.sort(Comparator.comparing(Object::toString));
 
-        List<Either<String, String>> expected =
-                Arrays.asList(
-                        Either.Left.of("1.0"),
-                        Either.Left.of("2.0"),
-                        Either.Right.of("1.0,2.0,2.0"),
-                        Either.Right.of("2.0,2.0,2.0"));
+        List<String> timeoutList = new ArrayList<>();
+        result.getSideOutput(outputTag).executeAndCollect().forEachRemaining(timeoutList::add);
+        timeoutList.sort(Comparator.comparing(Object::toString));
 
-        assertEquals(expected, resultList);
+        List<String> timeoutExpected = Arrays.asList("1.0", "2.0");
+        List<String> resultExpected = Arrays.asList("1.0,2.0,2.0", "2.0,2.0,2.0");
+
+        assertEquals(timeoutExpected, timeoutList);
+        assertEquals(resultExpected, resultList);
     }
 
     /**
@@ -1081,7 +1087,7 @@
                                         }
                                     }
                                 })
-                        .within(Time.milliseconds(100L));
+                        .within(Duration.ofMillis(100L));
 
         DataStream<String> result =
                 CEP.pattern(input, pattern)
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerUpgradeTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerUpgradeTest.java
index ab17a2b..d9f2392 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerUpgradeTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerUpgradeTest.java
@@ -31,6 +31,7 @@
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNode;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNodeSerializer;
+import org.apache.flink.test.util.MigrationTest;
 
 import org.assertj.core.api.Condition;
 
@@ -41,6 +42,13 @@
 /** Migration tests for NFA-related serializers. */
 class NFASerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Object, Object> {
 
+    // we dropped support for old versions and only test against versions since 1.20.
+    @Override
+    public Collection<FlinkVersion> getMigrationVersions() {
+        return FlinkVersion.rangeOf(
+                FlinkVersion.v1_20, MigrationTest.getMostRecentlyPublishedVersion());
+    }
+
     public Collection<TestSpecification<?, ?>> createTestSpecifications(FlinkVersion flinkVersion)
             throws Exception {
         ArrayList<TestSpecification<?, ?>> testSpecifications = new ArrayList<>();
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index d39c83e..cc6d538 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -31,7 +31,6 @@
 import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.cep.utils.TestSharedBuffer;
 import org.apache.flink.cep.utils.TestTimerService;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
@@ -43,6 +42,7 @@
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -297,7 +297,7 @@
                         .where(SimpleCondition.of(value -> value.getName().equals("middle")))
                         .followedBy("end")
                         .where(SimpleCondition.of(value -> value.getName().equals("end")))
-                        .within(Time.milliseconds(10));
+                        .within(Duration.ofMillis(10));
 
         NFA<Event> nfa = compile(pattern, false);
 
@@ -336,7 +336,7 @@
                         .where(SimpleCondition.of(value -> value.getName().equals("middle")))
                         .followedBy("end")
                         .where(SimpleCondition.of(value -> value.getName().equals("end")))
-                        .within(Time.milliseconds(9), WithinType.PREVIOUS_AND_CURRENT);
+                        .within(Duration.ofMillis(9), WithinType.PREVIOUS_AND_CURRENT);
 
         NFA<Event> nfa = compile(pattern, false);
 
@@ -393,7 +393,7 @@
                         .where(SimpleCondition.of(value -> value.getName().equals("middle")))
                         .followedByAny("end")
                         .where(SimpleCondition.of(value -> value.getName().equals("end")))
-                        .within(Time.milliseconds(10));
+                        .within(Duration.ofMillis(10));
 
         NFA<Event> nfa = compile(pattern, true);
 
@@ -463,7 +463,7 @@
                         .where(SimpleCondition.of(value -> value.getName().equals("middle")))
                         .followedByAny("end")
                         .where(SimpleCondition.of(value -> value.getName().equals("end")))
-                        .within(Time.milliseconds(10), WithinType.PREVIOUS_AND_CURRENT);
+                        .within(Duration.ofMillis(10), WithinType.PREVIOUS_AND_CURRENT);
 
         NFA<Event> nfa = compile(pattern, true);
 
@@ -532,7 +532,7 @@
                         .where(SimpleCondition.of(value -> value.getName().equals("start")))
                         .notFollowedBy("middle")
                         .where(SimpleCondition.of(value -> value.getName().equals("middle")))
-                        .within(Time.milliseconds(5), withinType);
+                        .within(Duration.ofMillis(5), withinType);
 
         NFA<Event> nfa = compile(pattern, true);
 
@@ -2304,7 +2304,7 @@
                         .times(2)
                         .followedBy("end1")
                         .where(SimpleCondition.of(value -> value.getName().equals("b")))
-                        .within(Time.milliseconds(8), withinType);
+                        .within(Duration.ofMillis(8), withinType);
 
         NFA<Event> nfa = compile(pattern, false);
 
@@ -2348,7 +2348,7 @@
                         .optional()
                         .followedBy("end1")
                         .where(SimpleCondition.of(value -> value.getName().equals("b")))
-                        .within(Time.milliseconds(8), withinType);
+                        .within(Duration.ofMillis(8), withinType);
 
         NFA<Event> nfa = compile(pattern, false);
 
@@ -2391,7 +2391,7 @@
                         .allowCombinations()
                         .followedBy("end1")
                         .where(SimpleCondition.of(value -> value.getName().equals("b")))
-                        .within(Time.milliseconds(8));
+                        .within(Duration.ofMillis(8));
 
         NFA<Event> nfa = compile(pattern, false);
 
@@ -2427,7 +2427,7 @@
                         .optional()
                         .followedBy("end1")
                         .where(SimpleCondition.of(value -> value.getName().equals("b")))
-                        .within(Time.milliseconds(8));
+                        .within(Duration.ofMillis(8));
 
         NFA<Event> nfa = compile(pattern, false);
 
@@ -2463,7 +2463,7 @@
                         .optional()
                         .followedBy("end1")
                         .where(SimpleCondition.of(value -> value.getName().equals("b")))
-                        .within(Time.milliseconds(8), WithinType.PREVIOUS_AND_CURRENT);
+                        .within(Duration.ofMillis(8), WithinType.PREVIOUS_AND_CURRENT);
 
         NFA<Event> nfa = compile(pattern, false);
 
@@ -2862,7 +2862,7 @@
                 Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
                         .times(4)
                         .where(SimpleCondition.of(value -> value.getName().equals("a")))
-                        .within(Time.milliseconds(3), withinType);
+                        .within(Duration.ofMillis(3), withinType);
 
         Event a1 = new Event(40, "a", 1.0);
         Event a2 = new Event(40, "a", 1.0);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index 12ecee5..52b4806 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -29,12 +29,12 @@
 import org.apache.flink.cep.time.TimerService;
 import org.apache.flink.cep.utils.TestSharedBuffer;
 import org.apache.flink.cep.utils.TestTimerService;
-import org.apache.flink.streaming.api.windowing.time.Time;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -106,7 +106,7 @@
                                         return value.getName().equals("e");
                                     }
                                 })
-                        .within(Time.milliseconds(10));
+                        .within(Duration.ofMillis(10));
 
         NFA<Event> nfa = compile(pattern, true);
 
@@ -228,7 +228,7 @@
                                         return value.getName().equals("b");
                                     }
                                 })
-                        .within(Time.milliseconds(10));
+                        .within(Duration.ofMillis(10));
 
         NFA<Event> nfa = compile(pattern, true);
 
@@ -278,7 +278,7 @@
                                         return value.getName().equals("end");
                                     }
                                 })
-                        .within(Time.milliseconds(10));
+                        .within(Duration.ofMillis(10));
 
         NFA<Event> nfa = compile(pattern, true);
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
index 14c092b..3b4e270 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
@@ -24,7 +24,6 @@
 import org.apache.flink.cep.pattern.WithinType;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.cep.utils.NFATestHarness;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
@@ -32,6 +31,7 @@
 
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -1018,7 +1018,7 @@
                         .where(SimpleCondition.of(value -> value.getName().equals("a")))
                         .notFollowedBy("b")
                         .where(SimpleCondition.of(value -> value.getName().equals("b")))
-                        .within(Time.milliseconds(3), withinType);
+                        .within(Duration.ofMillis(3), withinType);
 
         NFA<Event> nfa = compile(pattern, false);
 
@@ -1057,7 +1057,7 @@
                         .followedBy("c")
                         .where(SimpleCondition.of(value -> value.getName().equals("c")))
                         .times(0, 2)
-                        .within(Time.milliseconds(3));
+                        .within(Duration.ofMillis(3));
 
         NFA<Event> nfa = compile(pattern, false);
 
@@ -1099,7 +1099,7 @@
                         .where(SimpleCondition.of(value -> value.getName().equals("c")))
                         .notFollowedBy("b")
                         .where(SimpleCondition.of(value -> value.getName().equals("b")))
-                        .within(Time.milliseconds(5));
+                        .within(Duration.ofMillis(5));
 
         NFA<Event> nfa = compile(pattern, false);
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
index eb8d175..6d69337 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
@@ -31,6 +31,7 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -44,11 +45,11 @@
 @RunWith(Parameterized.class)
 public class TimesOrMoreITCase extends TestLogger {
 
-    @Parameterized.Parameter public Time time;
+    @Parameterized.Parameter public Duration time;
 
     @Parameterized.Parameters(name = "Times Range Time: {0}")
-    public static Collection<Time> parameters() {
-        return Arrays.asList(null, Time.milliseconds(3));
+    public static Collection<Duration> parameters() {
+        return Arrays.asList(null, Time.milliseconds(3).toDuration());
     }
 
     @Test
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
index 8160669..28f6532 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
@@ -21,7 +21,6 @@
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
@@ -31,6 +30,7 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -45,11 +45,11 @@
 @RunWith(Parameterized.class)
 public class TimesRangeITCase extends TestLogger {
 
-    @Parameterized.Parameter public Time time;
+    @Parameterized.Parameter public Duration time;
 
     @Parameterized.Parameters(name = "Times Range Time: {0}")
-    public static Collection<Time> parameters() {
-        return Arrays.asList(null, Time.milliseconds(3));
+    public static Collection<Duration> parameters() {
+        return Arrays.asList(null, Duration.ofMillis(3));
     }
 
     @Test
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index 48df918..51f8d89 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -39,6 +39,7 @@
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -176,7 +177,7 @@
                         .where(startFilter)
                         .notFollowedBy("middle")
                         .where(endFilter)
-                        .within(Time.milliseconds(1));
+                        .within(Duration.ofMillis(1));
 
         NFA<Event> nfa = compile(pattern, false);
 
@@ -301,9 +302,9 @@
         Pattern<Event, ?> pattern =
                 Pattern.<Event>begin("start")
                         .followedBy("middle")
-                        .within(Time.seconds(10))
+                        .within(Duration.ofSeconds(10))
                         .followedBy("then")
-                        .within(Time.seconds(20))
+                        .within(Duration.ofSeconds(20))
                         .followedBy("end");
 
         NFACompiler.NFAFactoryCompiler<Event> factory =
@@ -317,9 +318,9 @@
         Pattern<Event, ?> pattern =
                 Pattern.<Event>begin("start")
                         .followedBy("middle")
-                        .within(Time.seconds(10), WithinType.PREVIOUS_AND_CURRENT)
+                        .within(Duration.ofSeconds(10), WithinType.PREVIOUS_AND_CURRENT)
                         .followedBy("then")
-                        .within(Time.seconds(20), WithinType.PREVIOUS_AND_CURRENT)
+                        .within(Duration.ofSeconds(20), WithinType.PREVIOUS_AND_CURRENT)
                         .followedBy("end");
 
         NFACompiler.NFAFactoryCompiler<Event> factory =
@@ -337,9 +338,9 @@
         Pattern<Event, ?> pattern =
                 Pattern.<Event>begin("start")
                         .followedBy("middle")
-                        .within(Time.seconds(10))
+                        .within(Duration.ofSeconds(10))
                         .followedBy("then")
-                        .within(Time.seconds(0))
+                        .within(Duration.ofSeconds(0))
                         .followedBy("end");
 
         NFACompiler.NFAFactoryCompiler<Event> factory =
@@ -357,11 +358,11 @@
         Pattern<Event, ?> pattern =
                 Pattern.<Event>begin("start")
                         .followedBy("middle")
-                        .within(Time.seconds(3), WithinType.PREVIOUS_AND_CURRENT)
+                        .within(Duration.ofSeconds(3), WithinType.PREVIOUS_AND_CURRENT)
                         .followedBy("then")
-                        .within(Time.seconds(1), WithinType.PREVIOUS_AND_CURRENT)
+                        .within(Duration.ofSeconds(1), WithinType.PREVIOUS_AND_CURRENT)
                         .followedBy("end")
-                        .within(Time.milliseconds(2));
+                        .within(Duration.ofMillis(2));
 
         NFACompiler.NFAFactoryCompiler<Event> factory =
                 new NFACompiler.NFAFactoryCompiler<>(pattern);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerUpgradeTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerUpgradeTest.java
index 183d6b4..2014632 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerUpgradeTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerUpgradeTest.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.test.util.MigrationTest;
 
 import org.assertj.core.api.Condition;
 
@@ -36,6 +37,13 @@
 
     private static final String SPEC_NAME = "lockable-type-serializer";
 
+    // we dropped support for old versions and only test against versions since 1.20.
+    @Override
+    public Collection<FlinkVersion> getMigrationVersions() {
+        return FlinkVersion.rangeOf(
+                FlinkVersion.v1_20, MigrationTest.getMostRecentlyPublishedVersion());
+    }
+
     public Collection<TestSpecification<?, ?>> createTestSpecifications(FlinkVersion flinkVersion)
             throws Exception {
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
index 54d3237..783589f 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
@@ -30,7 +30,6 @@
 import org.apache.flink.cep.utils.CepOperatorTestUtilities;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -41,6 +40,7 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +64,7 @@
     @Parameterized.Parameters(name = "Migration Savepoint: {0}")
     public static Collection<FlinkVersion> parameters() {
         return FlinkVersion.rangeOf(
-                FlinkVersion.v1_8, MigrationTest.getMostRecentlyPublishedVersion());
+                FlinkVersion.v1_20, MigrationTest.getMostRecentlyPublishedVersion());
     }
 
     public CEPMigrationTest(FlinkVersion migrateVersion) {
@@ -632,7 +632,7 @@
             Pattern<Event, ?> pattern =
                     Pattern.<Event>begin("start")
                             .where(new StartFilter())
-                            .within(Time.milliseconds(10L));
+                            .within(Duration.ofMillis(10L));
 
             return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
         }
@@ -661,7 +661,7 @@
                             .where(new MiddleFilter())
                             .or(new SubEventEndFilter())
                             .times(2)
-                            .within(Time.milliseconds(10L));
+                            .within(Duration.ofMillis(10L));
 
             return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
         }
@@ -694,7 +694,7 @@
                             .where(new EndFilter())
                             // add a window timeout to test whether timestamps of elements in the
                             // priority queue in CEP operator are correctly checkpointed/restored
-                            .within(Time.milliseconds(10L));
+                            .within(Duration.ofMillis(10L));
 
             return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
         }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index af42555..69b2414 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -32,7 +32,6 @@
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
@@ -40,6 +39,7 @@
 
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -457,7 +457,7 @@
                             .where(SimpleCondition.of(value -> value.getName().equals("end")))
                             // add a window timeout to test whether timestamps of elements in the
                             // priority queue in CEP operator are correctly checkpointed/restored
-                            .within(Time.milliseconds(10L));
+                            .within(Duration.ofMillis(10L));
 
             return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
         }
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.10-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.10-snapshot
deleted file mode 100644
index a455e3a..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.10-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.11-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.11-snapshot
deleted file mode 100644
index aa300c9..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.11-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.12-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.12-snapshot
deleted file mode 100644
index 9ad3913..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.12-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.13-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.13-snapshot
deleted file mode 100644
index 1565922..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.13-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.14-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.14-snapshot
deleted file mode 100644
index 8111bda..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.14-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.15-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.15-snapshot
deleted file mode 100644
index a05be3a..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.15-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.16-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.16-snapshot
deleted file mode 100644
index ab29781..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.16-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.17-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.17-snapshot
deleted file mode 100644
index 9c9cd0e..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.17-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.18-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.18-snapshot
deleted file mode 100644
index 94ee7d9..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.18-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.19-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.19-snapshot
deleted file mode 100644
index 42c4893..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.19-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.8-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.8-snapshot
deleted file mode 100644
index 3bde0d6..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.8-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.9-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.9-snapshot
deleted file mode 100644
index 4c2e25b..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.9-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.10-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.10-snapshot
deleted file mode 100644
index 06f5693..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.10-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.11-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.11-snapshot
deleted file mode 100644
index cd33712..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.11-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.12-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.12-snapshot
deleted file mode 100644
index d0ebe23..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.12-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.13-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.13-snapshot
deleted file mode 100644
index caf8c51..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.13-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.14-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.14-snapshot
deleted file mode 100644
index a93bb0c..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.14-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.15-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.15-snapshot
deleted file mode 100644
index 7d308fb..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.15-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.16-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.16-snapshot
deleted file mode 100644
index a625f31..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.16-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.17-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.17-snapshot
deleted file mode 100644
index 9bd7c91..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.17-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.18-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.18-snapshot
deleted file mode 100644
index ab2653f..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.18-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.19-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.19-snapshot
deleted file mode 100644
index 2423732..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.19-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.8-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.8-snapshot
deleted file mode 100644
index 87b2f30..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.8-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.9-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.9-snapshot
deleted file mode 100644
index c41aa1f..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.9-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.10-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.10-snapshot
deleted file mode 100644
index 552b4fa..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.10-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.11-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.11-snapshot
deleted file mode 100644
index 18a7a00..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.11-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.12-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.12-snapshot
deleted file mode 100644
index 0592a5c..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.12-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.13-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.13-snapshot
deleted file mode 100644
index ee7bb33..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.13-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.14-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.14-snapshot
deleted file mode 100644
index bdf332f..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.14-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.15-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.15-snapshot
deleted file mode 100644
index 89133ae..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.15-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.16-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.16-snapshot
deleted file mode 100644
index 38d2310..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.16-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.17-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.17-snapshot
deleted file mode 100644
index d7eacf5..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.17-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.18-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.18-snapshot
deleted file mode 100644
index e5704f0..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.18-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.19-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.19-snapshot
deleted file mode 100644
index 7738efe..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.19-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.8-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.8-snapshot
deleted file mode 100644
index 4a9439c..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.8-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.9-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.9-snapshot
deleted file mode 100644
index 50d6799..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.9-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.10-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.10-snapshot
deleted file mode 100644
index 8c8e0a1..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.10-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.11-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.11-snapshot
deleted file mode 100644
index e48b010..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.11-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.12-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.12-snapshot
deleted file mode 100644
index 43d5436..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.12-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.13-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.13-snapshot
deleted file mode 100644
index cebbfee..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.13-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.14-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.14-snapshot
deleted file mode 100644
index a551ba1..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.14-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.15-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.15-snapshot
deleted file mode 100644
index 4b95652..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.15-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.16-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.16-snapshot
deleted file mode 100644
index 16d3938..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.16-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.17-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.17-snapshot
deleted file mode 100644
index 910e784..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.17-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.18-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.18-snapshot
deleted file mode 100644
index 05bbdb8..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.18-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.19-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.19-snapshot
deleted file mode 100644
index 007fdf5..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.19-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.8-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.8-snapshot
deleted file mode 100644
index ee1f821..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.8-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.9-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.9-snapshot
deleted file mode 100644
index 7b0ba00..0000000
--- a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.9-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.11/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.11/serializer-snapshot
deleted file mode 100644
index 338c628..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.11/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.11/test-data b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.11/test-data
deleted file mode 100644
index f870518..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.11/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.12/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.12/serializer-snapshot
deleted file mode 100644
index 338c628..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.12/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.12/test-data b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.12/test-data
deleted file mode 100644
index f870518..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.12/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.13/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.13/serializer-snapshot
deleted file mode 100644
index 338c628..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.13/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.13/test-data b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.13/test-data
deleted file mode 100644
index f870518..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.13/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.14/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.14/serializer-snapshot
deleted file mode 100644
index 338c628..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.14/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.14/test-data b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.14/test-data
deleted file mode 100644
index f870518..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.14/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.15/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.15/serializer-snapshot
deleted file mode 100644
index 338c628..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.15/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.15/test-data b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.15/test-data
deleted file mode 100644
index f870518..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.15/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.16/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.16/serializer-snapshot
deleted file mode 100644
index 338c628..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.16/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.16/test-data b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.16/test-data
deleted file mode 100644
index f870518..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.16/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.17/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.17/serializer-snapshot
deleted file mode 100644
index 338c628..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.17/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.17/test-data b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.17/test-data
deleted file mode 100644
index f870518..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.17/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.18/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.18/serializer-snapshot
deleted file mode 100644
index 338c628..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.18/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.18/test-data b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.18/test-data
deleted file mode 100644
index f870518..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.18/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.19/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.19/serializer-snapshot
deleted file mode 100644
index 338c628..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.19/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.19/test-data b/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.19/test-data
deleted file mode 100644
index f870518..0000000
--- a/flink-libraries/flink-cep/src/test/resources/dewey-number-serializer-1.19/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.11/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.11/serializer-snapshot
deleted file mode 100644
index 40dcf61..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.11/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.11/test-data b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.11/test-data
deleted file mode 100644
index 146a1fc..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.11/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.12/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.12/serializer-snapshot
deleted file mode 100644
index 40dcf61..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.12/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.12/test-data b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.12/test-data
deleted file mode 100644
index 146a1fc..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.12/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.13/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.13/serializer-snapshot
deleted file mode 100644
index 40dcf61..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.13/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.13/test-data b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.13/test-data
deleted file mode 100644
index 146a1fc..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.13/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.14/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.14/serializer-snapshot
deleted file mode 100644
index 40dcf61..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.14/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.14/test-data b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.14/test-data
deleted file mode 100644
index 146a1fc..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.14/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.15/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.15/serializer-snapshot
deleted file mode 100644
index 40dcf61..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.15/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.15/test-data b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.15/test-data
deleted file mode 100644
index 146a1fc..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.15/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.16/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.16/serializer-snapshot
deleted file mode 100644
index 40dcf61..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.16/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.16/test-data b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.16/test-data
deleted file mode 100644
index 146a1fc..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.16/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.17/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.17/serializer-snapshot
deleted file mode 100644
index 40dcf61..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.17/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.17/test-data b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.17/test-data
deleted file mode 100644
index 146a1fc..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.17/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.18/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.18/serializer-snapshot
deleted file mode 100644
index 40dcf61..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.18/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.18/test-data b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.18/test-data
deleted file mode 100644
index 146a1fc..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.18/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.19/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.19/serializer-snapshot
deleted file mode 100644
index 40dcf61..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.19/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.19/test-data b/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.19/test-data
deleted file mode 100644
index 146a1fc..0000000
--- a/flink-libraries/flink-cep/src/test/resources/event-id-serializer-1.19/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.11/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.11/serializer-snapshot
deleted file mode 100644
index 1a984b6..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.11/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.11/test-data b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.11/test-data
deleted file mode 100644
index 893957a..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.11/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.12/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.12/serializer-snapshot
deleted file mode 100644
index 1a984b6..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.12/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.12/test-data b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.12/test-data
deleted file mode 100644
index 893957a..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.12/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.13/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.13/serializer-snapshot
deleted file mode 100644
index 1a984b6..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.13/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.13/test-data b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.13/test-data
deleted file mode 100644
index 893957a..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.13/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.14/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.14/serializer-snapshot
deleted file mode 100644
index 1a984b6..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.14/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.14/test-data b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.14/test-data
deleted file mode 100644
index 893957a..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.14/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.15/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.15/serializer-snapshot
deleted file mode 100644
index 1a984b6..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.15/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.15/test-data b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.15/test-data
deleted file mode 100644
index 893957a..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.15/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.16/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.16/serializer-snapshot
deleted file mode 100644
index 1a984b6..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.16/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.16/test-data b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.16/test-data
deleted file mode 100644
index 893957a..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.16/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.17/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.17/serializer-snapshot
deleted file mode 100644
index 1a984b6..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.17/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.17/test-data b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.17/test-data
deleted file mode 100644
index 893957a..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.17/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.18/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.18/serializer-snapshot
deleted file mode 100644
index 1a984b6..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.18/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.18/test-data b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.18/test-data
deleted file mode 100644
index 893957a..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.18/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.19/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.19/serializer-snapshot
deleted file mode 100644
index 1a984b6..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.19/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.19/test-data b/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.19/test-data
deleted file mode 100644
index 893957a..0000000
--- a/flink-libraries/flink-cep/src/test/resources/lockable-type-serializer-1.19/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.11/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.11/serializer-snapshot
deleted file mode 100644
index b0d1ccf..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.11/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.11/test-data b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.11/test-data
deleted file mode 100644
index 1b1cb4d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.11/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.12/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.12/serializer-snapshot
deleted file mode 100644
index b0d1ccf..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.12/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.12/test-data b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.12/test-data
deleted file mode 100644
index 1b1cb4d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.12/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.13/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.13/serializer-snapshot
deleted file mode 100644
index b0d1ccf..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.13/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.13/test-data b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.13/test-data
deleted file mode 100644
index 1b1cb4d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.13/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.14/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.14/serializer-snapshot
deleted file mode 100644
index b0d1ccf..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.14/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.14/test-data b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.14/test-data
deleted file mode 100644
index 1b1cb4d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.14/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.15/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.15/serializer-snapshot
deleted file mode 100644
index b0d1ccf..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.15/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.15/test-data b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.15/test-data
deleted file mode 100644
index 1b1cb4d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.15/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.16/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.16/serializer-snapshot
deleted file mode 100644
index 7f01027..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.16/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.16/test-data b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.16/test-data
deleted file mode 100644
index 1b1cb4d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.16/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.17/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.17/serializer-snapshot
deleted file mode 100644
index 7f01027..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.17/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.17/test-data b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.17/test-data
deleted file mode 100644
index 1b1cb4d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.17/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.18/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.18/serializer-snapshot
deleted file mode 100644
index 7f01027..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.18/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.18/test-data b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.18/test-data
deleted file mode 100644
index 1b1cb4d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.18/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.19/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.19/serializer-snapshot
deleted file mode 100644
index 7f01027..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.19/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.19/test-data b/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.19/test-data
deleted file mode 100644
index 1b1cb4d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/nfa-state-serializer-1.19/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.11/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.11/serializer-snapshot
deleted file mode 100644
index a63858d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.11/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.11/test-data b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.11/test-data
deleted file mode 100644
index 0d33392..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.11/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.12/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.12/serializer-snapshot
deleted file mode 100644
index a63858d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.12/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.12/test-data b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.12/test-data
deleted file mode 100644
index 0d33392..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.12/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.13/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.13/serializer-snapshot
deleted file mode 100644
index a63858d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.13/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.13/test-data b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.13/test-data
deleted file mode 100644
index 0d33392..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.13/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.14/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.14/serializer-snapshot
deleted file mode 100644
index a63858d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.14/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.14/test-data b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.14/test-data
deleted file mode 100644
index 0d33392..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.14/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.15/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.15/serializer-snapshot
deleted file mode 100644
index a63858d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.15/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.15/test-data b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.15/test-data
deleted file mode 100644
index 0d33392..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.15/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.16/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.16/serializer-snapshot
deleted file mode 100644
index a63858d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.16/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.16/test-data b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.16/test-data
deleted file mode 100644
index 0d33392..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.16/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.17/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.17/serializer-snapshot
deleted file mode 100644
index a63858d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.17/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.17/test-data b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.17/test-data
deleted file mode 100644
index 0d33392..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.17/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.18/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.18/serializer-snapshot
deleted file mode 100644
index a63858d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.18/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.18/test-data b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.18/test-data
deleted file mode 100644
index 0d33392..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.18/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.19/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.19/serializer-snapshot
deleted file mode 100644
index a63858d..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.19/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.19/test-data b/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.19/test-data
deleted file mode 100644
index 0d33392..0000000
--- a/flink-libraries/flink-cep/src/test/resources/node-id-serializer-1.19/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.11/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.11/serializer-snapshot
deleted file mode 100644
index bd1cd93..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.11/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.11/test-data b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.11/test-data
deleted file mode 100644
index 8746c6f..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.11/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.12/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.12/serializer-snapshot
deleted file mode 100644
index bd1cd93..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.12/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.12/test-data b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.12/test-data
deleted file mode 100644
index 8746c6f..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.12/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.13/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.13/serializer-snapshot
deleted file mode 100644
index bd1cd93..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.13/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.13/test-data b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.13/test-data
deleted file mode 100644
index 8746c6f..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.13/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.14/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.14/serializer-snapshot
deleted file mode 100644
index bd1cd93..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.14/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.14/test-data b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.14/test-data
deleted file mode 100644
index 8746c6f..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.14/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.15/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.15/serializer-snapshot
deleted file mode 100644
index bd1cd93..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.15/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.15/test-data b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.15/test-data
deleted file mode 100644
index 8746c6f..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.15/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.16/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.16/serializer-snapshot
deleted file mode 100644
index bd1cd93..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.16/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.16/test-data b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.16/test-data
deleted file mode 100644
index 8746c6f..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.16/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.17/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.17/serializer-snapshot
deleted file mode 100644
index bd1cd93..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.17/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.17/test-data b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.17/test-data
deleted file mode 100644
index 8746c6f..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.17/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.18/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.18/serializer-snapshot
deleted file mode 100644
index bd1cd93..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.18/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.18/test-data b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.18/test-data
deleted file mode 100644
index 8746c6f..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.18/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.19/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.19/serializer-snapshot
deleted file mode 100644
index bd1cd93..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.19/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.19/test-data b/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.19/test-data
deleted file mode 100644
index 8746c6f..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-edge-serializer-1.19/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-node-serializer-1.11/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/shared-buffer-node-serializer-1.11/serializer-snapshot
deleted file mode 100644
index bd6c9b2..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-node-serializer-1.11/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-node-serializer-1.11/test-data b/flink-libraries/flink-cep/src/test/resources/shared-buffer-node-serializer-1.11/test-data
deleted file mode 100644
index c578971..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-node-serializer-1.11/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-node-serializer-1.12/serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/shared-buffer-node-serializer-1.12/serializer-snapshot
deleted file mode 100644
index bd6c9b2..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-node-serializer-1.12/serializer-snapshot
+++ /dev/null
Binary files differ
diff --git a/flink-libraries/flink-cep/src/test/resources/shared-buffer-node-serializer-1.12/test-data b/flink-libraries/flink-cep/src/test/resources/shared-buffer-node-serializer-1.12/test-data
deleted file mode 100644
index c578971..0000000
--- a/flink-libraries/flink-cep/src/test/resources/shared-buffer-node-serializer-1.12/test-data
+++ /dev/null
Binary files differ
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
index 519d336..a46fe99 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
@@ -72,6 +72,7 @@
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.tools.RelBuilder;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -232,7 +233,7 @@
 
         final Pattern<RowData, RowData> cepPattern;
         if (matchSpec.getInterval().isPresent()) {
-            Time interval = translateTimeBound(matchSpec.getInterval().get());
+            Duration interval = translateTimeBound(matchSpec.getInterval().get()).toDuration();
             cepPattern = matchSpec.getPattern().accept(patternVisitor).within(interval);
         } else {
             cepPattern = matchSpec.getPattern().accept(patternVisitor);
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTest.scala
index 1a234ae..a989370 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTest.scala
@@ -25,6 +25,8 @@
 import org.assertj.core.api.Assertions.{assertThatExceptionOfType, assertThatThrownBy}
 import org.junit.jupiter.api.Test
 
+import java.time.Duration
+
 class PatternTranslatorTest extends PatternTranslatorTestBase {
 
   @Test
@@ -257,7 +259,7 @@
       Pattern
         .begin("A", skipToNext())
         .next("B")
-        .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000 + 4))
+        .within(Duration.ofMillis(10 * 24 * 60 * 60 * 1000 + 4))
     )
 
     verifyPattern(
@@ -273,7 +275,7 @@
       Pattern
         .begin("A", skipToNext())
         .next("B")
-        .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000))
+        .within(Duration.ofMillis(10 * 24 * 60 * 60 * 1000))
     )
 
     verifyPattern(
@@ -289,7 +291,7 @@
       Pattern
         .begin("A", skipToNext())
         .next("B")
-        .within(Time.milliseconds(10 * 60 * 1000))
+        .within(Duration.ofMillis(10 * 60 * 1000))
     )
   }
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
index ac00612..f508aee 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
@@ -49,14 +49,11 @@
 import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.cep.nfa.DeweyNumber;
-import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.NFAStateSerializer;
-import org.apache.flink.cep.nfa.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.Lockable;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNode;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNodeSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
 import org.apache.flink.formats.avro.typeutils.AvroSerializer;
@@ -156,7 +153,6 @@
                         org.apache.flink.runtime.state.VoidNamespaceSerializer.class.getName(),
                         TestDuplicateSerializer.class.getName(),
                         LinkedListSerializer.class.getName(),
-                        SharedBuffer.SharedBufferSerializer.class.getName(),
                         WindowKeySerializer.class.getName(),
                         DeweyNumber.DeweyNumberSerializer.class.getName(),
                         SharedBufferNodeSerializer.class.getName(),
@@ -182,8 +178,6 @@
                         SharedBufferEdge.SharedBufferEdgeSerializer.class.getName(),
                         RowDataSerializer.class.getName(),
                         DecimalDataSerializer.class.getName(),
-                        SharedBufferNode.SharedBufferNodeSerializer.class.getName(),
-                        NFA.NFASerializer.class.getName(),
                         AvroSerializer.class.getName());
 
         //  type serializer whitelist for TypeSerializerUpgradeTestBase test coverage
@@ -216,7 +210,6 @@
                         GlobalWindow.Serializer.class.getName(),
                         TestDuplicateSerializer.class.getName(),
                         LinkedListSerializer.class.getName(),
-                        SharedBuffer.SharedBufferSerializer.class.getName(),
                         WindowKeySerializer.class.getName(),
                         DeweyNumber.DeweyNumberSerializer.class.getName(),
                         SharedBufferNodeSerializer.class.getName(),
@@ -242,8 +235,6 @@
                         SharedBufferEdge.SharedBufferEdgeSerializer.class.getName(),
                         RowDataSerializer.class.getName(),
                         DecimalDataSerializer.class.getName(),
-                        SharedBufferNode.SharedBufferNodeSerializer.class.getName(),
-                        NFA.NFASerializer.class.getName(),
                         AvroSerializer.class.getName(),
                         // KeyAndValueSerializer shouldn't be used to serialize data to state and
                         // doesn't need to ensure upgrade compatibility.