[FLINK-13586] Make ClosureCleaner.clean() backwards compatible with 1.8.0
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
index 47c55a1..b3948b2 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.connectors.cassandra;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
@@ -55,7 +54,7 @@
public CassandraCommitter(ClusterBuilder builder) {
this.builder = builder;
- ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(builder, true);
}
public CassandraCommitter(ClusterBuilder builder, String keySpace) {
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
index 4374deb..6b3d418 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.connectors.cassandra;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
@@ -63,7 +62,7 @@
super(committer, serializer, UUID.randomUUID().toString().replace("-", "_"));
this.insertQuery = insertQuery;
this.builder = builder;
- ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(builder, true);
}
public void open() throws Exception {
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 0e7eb6f..5d758be 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.connectors.cassandra;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -65,7 +64,7 @@
this.builder = builder;
this.config = config;
this.failureHandler = Preconditions.checkNotNull(failureHandler);
- ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(builder, true);
}
@Override
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
index c8992a7..b028055 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.connectors.cassandra;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple;
@@ -63,7 +62,7 @@
super(committer, serializer, UUID.randomUUID().toString().replace("-", "_"));
this.insertQuery = insertQuery;
this.builder = builder;
- ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(builder, true);
}
public void open() throws Exception {
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 3377ec1..5d22cc5 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -20,7 +20,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.state.ListState;
@@ -491,7 +490,7 @@
this.kafkaProducersPoolSize = kafkaProducersPoolSize;
checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty");
- ClosureCleaner.clean(this.flinkKafkaPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(this.flinkKafkaPartitioner, true);
ClosureCleaner.ensureSerializable(serializationSchema);
// set the producer configuration properties for kafka record key value serializers.
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 034a50b..6cf1839 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -19,7 +19,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
@@ -297,7 +296,7 @@
throw new IllegalStateException("A periodic watermark emitter has already been set.");
}
try {
- ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(assigner, true);
this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner);
return this;
} catch (Exception e) {
@@ -332,7 +331,7 @@
throw new IllegalStateException("A punctuated watermark emitter has already been set.");
}
try {
- ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(assigner, true);
this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
return this;
} catch (Exception e) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index b23058c..3a1d1b6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -19,7 +19,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
@@ -144,7 +143,7 @@
requireNonNull(defaultTopicId, "TopicID not set");
requireNonNull(serializationSchema, "serializationSchema not set");
requireNonNull(producerConfig, "producerConfig not set");
- ClosureCleaner.clean(customPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(customPartitioner, true);
ClosureCleaner.ensureSerializable(serializationSchema);
this.defaultTopicId = defaultTopicId;
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 230c138..d7c8129 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -20,7 +20,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.state.ListState;
@@ -493,7 +492,7 @@
this.kafkaProducersPoolSize = kafkaProducersPoolSize;
checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty");
- ClosureCleaner.clean(this.flinkKafkaPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(this.flinkKafkaPartitioner, true);
ClosureCleaner.ensureSerializable(serializationSchema);
// set the producer configuration properties for kafka record key value serializers.
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 8916c34..5b24ded 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -19,7 +19,6 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.state.ListState;
@@ -239,7 +238,7 @@
*/
public void setShardAssigner(KinesisShardAssigner shardAssigner) {
this.shardAssigner = checkNotNull(shardAssigner, "function can not be null");
- ClosureCleaner.clean(shardAssigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(shardAssigner, true);
}
public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner() {
@@ -254,7 +253,7 @@
public void setPeriodicWatermarkAssigner(
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
this.periodicWatermarkAssigner = periodicWatermarkAssigner;
- ClosureCleaner.clean(this.periodicWatermarkAssigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
}
public WatermarkTracker getWatermarkTracker() {
@@ -268,7 +267,7 @@
*/
public void setWatermarkTracker(WatermarkTracker watermarkTracker) {
this.watermarkTracker = watermarkTracker;
- ClosureCleaner.clean(this.watermarkTracker, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(this.watermarkTracker, true);
}
// ------------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index bd91b2d..abb9c5b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -53,6 +53,26 @@
private static final Logger LOG = LoggerFactory.getLogger(ClosureCleaner.class);
/**
+ * Tries to clean the closure of the given object, if the object is a non-static inner class.
+ * This will use the default closure cleaner level of {@link ExecutionConfig.ClosureCleanerLevel#RECURSIVE}.
+ *
+ * @param func The object whose closure should be cleaned.
+ * @param checkSerializable Flag to indicate whether serializability should be checked after
+ * the closure cleaning attempt.
+ * @throws InvalidProgramException Thrown, if 'checkSerializable' is true, and the object was
+ * not serializable after the closure cleaning.
+ * @throws RuntimeException A RuntimeException may be thrown, if the code of the class could
+ * not be loaded, in order to process during the closure cleaning.
+ */
+ public static void clean(Object func, boolean checkSerializable) {
+ clean(
+ func,
+ ExecutionConfig.ClosureCleanerLevel.RECURSIVE,
+ checkSerializable,
+ Collections.newSetFromMap(new IdentityHashMap<>()));
+ }
+
+ /**
* Tries to clean the closure of the given object, if the object is a non-static inner
* class.
*
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 7b93e8a..ec7d42e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -54,7 +54,7 @@
MapCreator creator = new NonSerializableMapCreator();
MapFunction<Integer, Integer> map = creator.getMap();
- ClosureCleaner.clean(map, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(map, true);
int result = map.map(3);
Assert.assertEquals(result, 4);
@@ -65,7 +65,7 @@
MapCreator creator = new SerializableMapCreator(1);
MapFunction<Integer, Integer> map = creator.getMap();
- ClosureCleaner.clean(map, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(map, true);
int result = map.map(3);
Assert.assertEquals(result, 4);
@@ -76,7 +76,7 @@
MapCreator creator = new NestedSerializableMapCreator(1);
MapFunction<Integer, Integer> map = creator.getMap();
- ClosureCleaner.clean(map, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(map, true);
ClosureCleaner.ensureSerializable(map);
@@ -89,7 +89,7 @@
MapCreator creator = new NestedNonSerializableMapCreator(1);
MapFunction<Integer, Integer> map = creator.getMap();
- ClosureCleaner.clean(map, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(map, true);
ClosureCleaner.ensureSerializable(map);
@@ -104,7 +104,7 @@
WrapperMapFunction wrapped = new WrapperMapFunction(notCleanedMap);
- ClosureCleaner.clean(wrapped, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(wrapped, true);
ClosureCleaner.ensureSerializable(wrapped);
@@ -116,7 +116,7 @@
public void testComplexTopLevelClassClean() throws Exception {
MapFunction<Integer, Integer> complexMap = new ComplexMap((MapFunction<Integer, Integer>) value -> value + 1);
- ClosureCleaner.clean(complexMap, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(complexMap, true);
int result = complexMap.map(3);
@@ -127,7 +127,7 @@
public void testComplexInnerClassClean() throws Exception {
MapFunction<Integer, Integer> complexMap = new InnerComplexMap((MapFunction<Integer, Integer>) value -> value + 1);
- ClosureCleaner.clean(complexMap, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(complexMap, true);
int result = complexMap.map(3);
@@ -137,7 +137,7 @@
@Test
public void testSelfReferencingClean() {
final NestedSelfReferencing selfReferencing = new NestedSelfReferencing();
- ClosureCleaner.clean(selfReferencing, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(selfReferencing, true);
}
class InnerCustomMap implements MapFunction<Integer, Integer> {
@@ -191,7 +191,7 @@
MapFunction<Integer, Integer> wrappedMap = new WrapperMapFunction(nestedMap);
- ClosureCleaner.clean(wrappedMap, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(wrappedMap, true);
ClosureCleaner.ensureSerializable(wrappedMap);
}
@@ -204,7 +204,7 @@
Tuple1<MapFunction<Integer, Integer>> tuple = new Tuple1<>(wrappedMap);
- ClosureCleaner.clean(tuple, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(tuple, true);
ClosureCleaner.ensureSerializable(tuple);
}
@@ -213,7 +213,7 @@
public void testRecursiveClass() {
RecursiveClass recursiveClass = new RecursiveClass(new RecursiveClass());
- ClosureCleaner.clean(recursiveClass, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(recursiveClass, true);
ClosureCleaner.ensureSerializable(recursiveClass);
}
@@ -230,7 +230,7 @@
public void testWriteReplaceRecursive() {
WithWriteReplace writeReplace = new WithWriteReplace(new WithWriteReplace.Payload("text"));
Assert.assertEquals("text", writeReplace.getPayload().getRaw());
- ClosureCleaner.clean(writeReplace, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(writeReplace, true);
}
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 49aaf6d..6423676 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -18,7 +18,6 @@
package org.apache.flink.cep.pattern;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
@@ -156,7 +155,7 @@
public Pattern<T, F> where(IterativeCondition<F> condition) {
Preconditions.checkNotNull(condition, "The condition cannot be null.");
- ClosureCleaner.clean(condition, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(condition, true);
if (this.condition == null) {
this.condition = condition;
} else {
@@ -178,7 +177,7 @@
public Pattern<T, F> or(IterativeCondition<F> condition) {
Preconditions.checkNotNull(condition, "The condition cannot be null.");
- ClosureCleaner.clean(condition, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(condition, true);
if (this.condition == null) {
this.condition = condition;
@@ -228,7 +227,7 @@
throw new MalformedPatternException("The until condition is only applicable to looping states.");
}
- ClosureCleaner.clean(untilCondition, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(untilCondition, true);
this.untilCondition = untilCondition;
return this;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java
index 709a582..c566743 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java
@@ -18,15 +18,14 @@
package org.apache.flink.runtime.taskexecutor.rpc;
-import java.io.IOException;
-
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.util.InstantiationUtil;
+import java.io.IOException;
+
public class RpcGlobalAggregateManager implements GlobalAggregateManager {
private final JobMasterGateway jobMasterGateway;
@@ -38,7 +37,7 @@
@Override
public <IN, ACC, OUT> OUT updateGlobalAggregate(String aggregateName, Object aggregand, AggregateFunction<IN, ACC, OUT> aggregateFunction)
throws IOException {
- ClosureCleaner.clean(aggregateFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE,true);
+ ClosureCleaner.clean(aggregateFunction, true);
byte[] serializedAggregateFunction = InstantiationUtil.serializeObject(aggregateFunction);
Object result = null;
try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index f42030e..836bea3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1636,7 +1636,7 @@
AggregateFunction<Integer, Integer, Integer> aggregateFunction = createAggregateFunction();
- ClosureCleaner.clean(aggregateFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(aggregateFunction, true);
byte[] serializedAggregateFunction = InstantiationUtil.serializeObject(aggregateFunction);
updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", 1, serializedAggregateFunction);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index d07b892..7ac0cf3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.runtime.tasks;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
@@ -139,7 +138,7 @@
public <K> void configureForKeyedStream(
KeySelector<IN, K> keySelector,
TypeInformation<K> keyType) {
- ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
+ ClosureCleaner.clean(keySelector, false);
streamConfig.setStatePartitioner(0, keySelector);
streamConfig.setStateKeySerializer(keyType.createSerializer(executionConfig));
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 31b10f5..caf846f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.util;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
@@ -44,7 +43,7 @@
int subtaskIndex) throws Exception {
super(operator, maxParallelism, numSubtasks, subtaskIndex);
- ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
+ ClosureCleaner.clean(keySelector, false);
config.setStatePartitioner(0, keySelector);
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
}
@@ -64,7 +63,7 @@
super(operator, environment);
- ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
+ ClosureCleaner.clean(keySelector, false);
config.setStatePartitioner(0, keySelector);
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index eb6222f..c00e59a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.util;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
@@ -44,8 +43,8 @@
int subtaskIndex) throws Exception {
super(operator, maxParallelism, numSubtasks, subtaskIndex);
- ClosureCleaner.clean(keySelector1, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
- ClosureCleaner.clean(keySelector2, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
+ ClosureCleaner.clean(keySelector1, false);
+ ClosureCleaner.clean(keySelector2, false);
config.setStatePartitioner(0, keySelector1);
config.setStatePartitioner(1, keySelector2);
config.setStateKeySerializer(keyType.createSerializer(executionConfig));