[FLINK-13586] Make ClosureCleaner.clean() backwards compatible with 1.8.0
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
index 47c55a1..b3948b2 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
 
@@ -55,7 +54,7 @@
 
 	public CassandraCommitter(ClusterBuilder builder) {
 		this.builder = builder;
-		ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(builder, true);
 	}
 
 	public CassandraCommitter(ClusterBuilder builder, String keySpace) {
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
index 4374deb..6b3d418 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
@@ -63,7 +62,7 @@
 		super(committer, serializer, UUID.randomUUID().toString().replace("-", "_"));
 		this.insertQuery = insertQuery;
 		this.builder = builder;
-		ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(builder, true);
 	}
 
 	public void open() throws Exception {
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 0e7eb6f..5d758be 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.connectors.cassandra;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -65,7 +64,7 @@
 		this.builder = builder;
 		this.config = config;
 		this.failureHandler = Preconditions.checkNotNull(failureHandler);
-		ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(builder, true);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
index c8992a7..b028055 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -63,7 +62,7 @@
 		super(committer, serializer, UUID.randomUUID().toString().replace("-", "_"));
 		this.insertQuery = insertQuery;
 		this.builder = builder;
-		ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(builder, true);
 	}
 
 	public void open() throws Exception {
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 3377ec1..5d22cc5 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -20,7 +20,6 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.state.ListState;
@@ -491,7 +490,7 @@
 		this.kafkaProducersPoolSize = kafkaProducersPoolSize;
 		checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty");
 
-		ClosureCleaner.clean(this.flinkKafkaPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(this.flinkKafkaPartitioner, true);
 		ClosureCleaner.ensureSerializable(serializationSchema);
 
 		// set the producer configuration properties for kafka record key value serializers.
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 034a50b..6cf1839 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -19,7 +19,6 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -297,7 +296,7 @@
 			throw new IllegalStateException("A periodic watermark emitter has already been set.");
 		}
 		try {
-			ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+			ClosureCleaner.clean(assigner, true);
 			this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner);
 			return this;
 		} catch (Exception e) {
@@ -332,7 +331,7 @@
 			throw new IllegalStateException("A punctuated watermark emitter has already been set.");
 		}
 		try {
-			ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+			ClosureCleaner.clean(assigner, true);
 			this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
 			return this;
 		} catch (Exception e) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index b23058c..3a1d1b6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -19,7 +19,6 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
@@ -144,7 +143,7 @@
 		requireNonNull(defaultTopicId, "TopicID not set");
 		requireNonNull(serializationSchema, "serializationSchema not set");
 		requireNonNull(producerConfig, "producerConfig not set");
-		ClosureCleaner.clean(customPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(customPartitioner, true);
 		ClosureCleaner.ensureSerializable(serializationSchema);
 
 		this.defaultTopicId = defaultTopicId;
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 230c138..d7c8129 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -20,7 +20,6 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.state.ListState;
@@ -493,7 +492,7 @@
 		this.kafkaProducersPoolSize = kafkaProducersPoolSize;
 		checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty");
 
-		ClosureCleaner.clean(this.flinkKafkaPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(this.flinkKafkaPartitioner, true);
 		ClosureCleaner.ensureSerializable(serializationSchema);
 
 		// set the producer configuration properties for kafka record key value serializers.
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 8916c34..5b24ded 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -19,7 +19,6 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.state.ListState;
@@ -239,7 +238,7 @@
 	 */
 	public void setShardAssigner(KinesisShardAssigner shardAssigner) {
 		this.shardAssigner = checkNotNull(shardAssigner, "function can not be null");
-		ClosureCleaner.clean(shardAssigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(shardAssigner, true);
 	}
 
 	public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner() {
@@ -254,7 +253,7 @@
 	public void setPeriodicWatermarkAssigner(
 		AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
 		this.periodicWatermarkAssigner = periodicWatermarkAssigner;
-		ClosureCleaner.clean(this.periodicWatermarkAssigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
 	}
 
 	public WatermarkTracker getWatermarkTracker() {
@@ -268,7 +267,7 @@
 	 */
 	public void setWatermarkTracker(WatermarkTracker watermarkTracker) {
 		this.watermarkTracker = watermarkTracker;
-		ClosureCleaner.clean(this.watermarkTracker, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(this.watermarkTracker, true);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index bd91b2d..abb9c5b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -53,6 +53,26 @@
 	private static final Logger LOG = LoggerFactory.getLogger(ClosureCleaner.class);
 
 	/**
+	 * Tries to clean the closure of the given object, if the object is a non-static inner class.
+	 * This will use the default closure cleaner level of {@link ExecutionConfig.ClosureCleanerLevel#RECURSIVE}.
+	 *
+	 * @param func The object whose closure should be cleaned.
+	 * @param checkSerializable Flag to indicate whether serializability should be checked after
+	 * 		the closure cleaning attempt.
+	 * @throws InvalidProgramException Thrown, if 'checkSerializable' is true, and the object was
+	 * 		not serializable after the closure cleaning.
+	 * @throws RuntimeException A RuntimeException may be thrown, if the code of the class could
+	 * 		not be loaded, in order to process during the closure cleaning.
+	 */
+	public static void clean(Object func, boolean checkSerializable) {
+		clean(
+				func,
+				ExecutionConfig.ClosureCleanerLevel.RECURSIVE,
+				checkSerializable,
+				Collections.newSetFromMap(new IdentityHashMap<>()));
+	}
+
+	/**
 	 * Tries to clean the closure of the given object, if the object is a non-static inner
 	 * class.
 	 *
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 7b93e8a..ec7d42e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -54,7 +54,7 @@
 		MapCreator creator = new NonSerializableMapCreator();
 		MapFunction<Integer, Integer> map = creator.getMap();
 
-		ClosureCleaner.clean(map, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(map, true);
 
 		int result = map.map(3);
 		Assert.assertEquals(result, 4);
@@ -65,7 +65,7 @@
 		MapCreator creator = new SerializableMapCreator(1);
 		MapFunction<Integer, Integer> map = creator.getMap();
 
-		ClosureCleaner.clean(map, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(map, true);
 
 		int result = map.map(3);
 		Assert.assertEquals(result, 4);
@@ -76,7 +76,7 @@
 		MapCreator creator = new NestedSerializableMapCreator(1);
 		MapFunction<Integer, Integer> map = creator.getMap();
 
-		ClosureCleaner.clean(map, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(map, true);
 
 		ClosureCleaner.ensureSerializable(map);
 
@@ -89,7 +89,7 @@
 		MapCreator creator = new NestedNonSerializableMapCreator(1);
 		MapFunction<Integer, Integer> map = creator.getMap();
 
-		ClosureCleaner.clean(map, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(map, true);
 
 		ClosureCleaner.ensureSerializable(map);
 
@@ -104,7 +104,7 @@
 
 		WrapperMapFunction wrapped = new WrapperMapFunction(notCleanedMap);
 
-		ClosureCleaner.clean(wrapped, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(wrapped, true);
 
 		ClosureCleaner.ensureSerializable(wrapped);
 
@@ -116,7 +116,7 @@
 	public void testComplexTopLevelClassClean() throws Exception {
 		MapFunction<Integer, Integer> complexMap = new ComplexMap((MapFunction<Integer, Integer>) value -> value + 1);
 
-		ClosureCleaner.clean(complexMap, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(complexMap, true);
 
 		int result = complexMap.map(3);
 
@@ -127,7 +127,7 @@
 	public void testComplexInnerClassClean() throws Exception {
 		MapFunction<Integer, Integer> complexMap = new InnerComplexMap((MapFunction<Integer, Integer>) value -> value + 1);
 
-		ClosureCleaner.clean(complexMap, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(complexMap, true);
 
 		int result = complexMap.map(3);
 
@@ -137,7 +137,7 @@
 	@Test
 	public void testSelfReferencingClean() {
 		final NestedSelfReferencing selfReferencing = new NestedSelfReferencing();
-		ClosureCleaner.clean(selfReferencing, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(selfReferencing, true);
 	}
 
 	class InnerCustomMap implements MapFunction<Integer, Integer> {
@@ -191,7 +191,7 @@
 
 		MapFunction<Integer, Integer> wrappedMap = new WrapperMapFunction(nestedMap);
 
-		ClosureCleaner.clean(wrappedMap, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(wrappedMap, true);
 
 		ClosureCleaner.ensureSerializable(wrappedMap);
 	}
@@ -204,7 +204,7 @@
 
 		Tuple1<MapFunction<Integer, Integer>> tuple = new Tuple1<>(wrappedMap);
 
-		ClosureCleaner.clean(tuple, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(tuple, true);
 
 		ClosureCleaner.ensureSerializable(tuple);
 	}
@@ -213,7 +213,7 @@
 	public void testRecursiveClass() {
 		RecursiveClass recursiveClass = new RecursiveClass(new RecursiveClass());
 
-		ClosureCleaner.clean(recursiveClass, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(recursiveClass, true);
 
 		ClosureCleaner.ensureSerializable(recursiveClass);
 	}
@@ -230,7 +230,7 @@
 	public void testWriteReplaceRecursive() {
 		WithWriteReplace writeReplace = new WithWriteReplace(new WithWriteReplace.Payload("text"));
 		Assert.assertEquals("text", writeReplace.getPayload().getRaw());
-		ClosureCleaner.clean(writeReplace, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(writeReplace, true);
 	}
 }
 
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 49aaf6d..6423676 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.pattern;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
@@ -156,7 +155,7 @@
 	public Pattern<T, F> where(IterativeCondition<F> condition) {
 		Preconditions.checkNotNull(condition, "The condition cannot be null.");
 
-		ClosureCleaner.clean(condition, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(condition, true);
 		if (this.condition == null) {
 			this.condition = condition;
 		} else {
@@ -178,7 +177,7 @@
 	public Pattern<T, F> or(IterativeCondition<F> condition) {
 		Preconditions.checkNotNull(condition, "The condition cannot be null.");
 
-		ClosureCleaner.clean(condition, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(condition, true);
 
 		if (this.condition == null) {
 			this.condition = condition;
@@ -228,7 +227,7 @@
 			throw new MalformedPatternException("The until condition is only applicable to looping states.");
 		}
 
-		ClosureCleaner.clean(untilCondition, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+		ClosureCleaner.clean(untilCondition, true);
 		this.untilCondition = untilCondition;
 
 		return this;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java
index 709a582..c566743 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.runtime.taskexecutor.rpc;
 
-import java.io.IOException;
-
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 import org.apache.flink.util.InstantiationUtil;
 
+import java.io.IOException;
+
 public class RpcGlobalAggregateManager implements GlobalAggregateManager {
 
 	private final JobMasterGateway jobMasterGateway;
@@ -38,7 +37,7 @@
 	@Override
 	public <IN, ACC, OUT> OUT updateGlobalAggregate(String aggregateName, Object aggregand, AggregateFunction<IN, ACC, OUT> aggregateFunction)
 		throws IOException {
-		ClosureCleaner.clean(aggregateFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE,true);
+		ClosureCleaner.clean(aggregateFunction, true);
 		byte[] serializedAggregateFunction = InstantiationUtil.serializeObject(aggregateFunction);
 		Object result = null;
 		try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index f42030e..836bea3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1636,7 +1636,7 @@
 
 			AggregateFunction<Integer, Integer, Integer> aggregateFunction = createAggregateFunction();
 
-			ClosureCleaner.clean(aggregateFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+			ClosureCleaner.clean(aggregateFunction, true);
 			byte[] serializedAggregateFunction = InstantiationUtil.serializeObject(aggregateFunction);
 
 			updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", 1, serializedAggregateFunction);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index d07b892..7ac0cf3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
@@ -139,7 +138,7 @@
 	public <K> void configureForKeyedStream(
 			KeySelector<IN, K> keySelector,
 			TypeInformation<K> keyType) {
-		ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
+		ClosureCleaner.clean(keySelector, false);
 		streamConfig.setStatePartitioner(0, keySelector);
 		streamConfig.setStateKeySerializer(keyType.createSerializer(executionConfig));
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 31b10f5..caf846f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -44,7 +43,7 @@
 			int subtaskIndex) throws Exception {
 		super(operator, maxParallelism, numSubtasks, subtaskIndex);
 
-		ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
+		ClosureCleaner.clean(keySelector, false);
 		config.setStatePartitioner(0, keySelector);
 		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
 	}
@@ -64,7 +63,7 @@
 
 		super(operator, environment);
 
-		ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
+		ClosureCleaner.clean(keySelector, false);
 		config.setStatePartitioner(0, keySelector);
 		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index eb6222f..c00e59a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -44,8 +43,8 @@
 			int subtaskIndex) throws Exception {
 		super(operator, maxParallelism, numSubtasks, subtaskIndex);
 
-		ClosureCleaner.clean(keySelector1, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
-		ClosureCleaner.clean(keySelector2, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
+		ClosureCleaner.clean(keySelector1, false);
+		ClosureCleaner.clean(keySelector2, false);
 		config.setStatePartitioner(0, keySelector1);
 		config.setStatePartitioner(1, keySelector2);
 		config.setStateKeySerializer(keyType.createSerializer(executionConfig));