[FLINK-25222][tests] Remove NetworkFailureProxy used in Kafka tests
We suspect that the NetworkFailureProxy is causing constant connectivity
problems to the brokers during testing resulting in either network
timeouts or corrupted results.
Since the NetworkFailureProxy is only used for testing the deprecated
FlinkKafkaProducer/Consumer we can safely remove it because we will not
add new features to the connectors.
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 2d6822b..ce7db35 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -29,7 +29,6 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -50,9 +49,7 @@
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -211,135 +208,6 @@
}
}
- /** Tests the at-least-once semantic for the simple writes into Kafka. */
- @Test
- public void testOneToOneAtLeastOnceRegularSink() throws Exception {
- testOneToOneAtLeastOnce(true);
- }
-
- /** Tests the at-least-once semantic for the simple writes into Kafka. */
- @Test
- public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
- testOneToOneAtLeastOnce(false);
- }
-
- /**
- * This test sets KafkaProducer so that it will not automatically flush the data and simulate
- * network failure between Flink and Kafka to check whether FlinkKafkaProducer flushed records
- * manually on snapshotState.
- *
- * <p>Due to legacy reasons there are two different ways of instantiating a Kafka 0.10 sink. The
- * parameter controls which method is used.
- */
- protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
- final String topic =
- regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
- final int partition = 0;
- final int numElements = 1000;
- final int failAfterElements = 333;
-
- createTestTopic(topic, 1, 1);
-
- TypeInformationSerializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(
- BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(500);
- env.setParallelism(1);
- env.setRestartStrategy(RestartStrategies.noRestart());
-
- Properties properties = new Properties();
- properties.putAll(standardProps);
- properties.putAll(secureProps);
- // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer
- // will try send pending (not flushed) data on close()
- properties.setProperty("timeout.ms", "10000");
- // KafkaProducer prior to KIP-91 (release 2.1) uses request timeout to expire the unsent
- // records.
- properties.setProperty("request.timeout.ms", "3000");
- // KafkaProducer in 2.1.0 and above uses delivery timeout to expire the records.
- properties.setProperty("delivery.timeout.ms", "5000");
- properties.setProperty("max.block.ms", "10000");
- // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events
- // instead of flushing them immediately
- properties.setProperty("batch.size", "10240000");
- properties.setProperty("linger.ms", "10000");
- // kafka producer messages guarantee
- properties.setProperty("retries", "3");
- properties.setProperty("acks", "all");
-
- BrokerRestartingMapper.resetState(kafkaServer::blockProxyTraffic);
-
- // process exactly failAfterElements number of elements and then shutdown Kafka broker and
- // fail application
- DataStream<Integer> inputStream =
- env.addSource(new InfiniteIntegerSource())
- .map(new BrokerRestartingMapper<>(failAfterElements));
-
- StreamSink<Integer> kafkaSink =
- kafkaServer.getProducerSink(
- topic,
- schema,
- properties,
- new FlinkKafkaPartitioner<Integer>() {
- @Override
- public int partition(
- Integer record,
- byte[] key,
- byte[] value,
- String targetTopic,
- int[] partitions) {
- return partition;
- }
- });
-
- if (regularSink) {
- inputStream.addSink(kafkaSink.getUserFunction());
- } else {
- kafkaServer.produceIntoKafka(
- inputStream,
- topic,
- schema,
- properties,
- new FlinkKafkaPartitioner<Integer>() {
- @Override
- public int partition(
- Integer record,
- byte[] key,
- byte[] value,
- String targetTopic,
- int[] partitions) {
- return partition;
- }
- });
- }
-
- try {
- env.execute("One-to-one at least once test");
- fail("Job should fail!");
- } catch (JobExecutionException ex) {
- // ignore error, it can be one of many errors so it would be hard to check the exception
- // message/cause
- } finally {
- kafkaServer.unblockProxyTraffic();
- }
-
- // assert that before failure we successfully snapshot/flushed all expected elements
- assertAtLeastOnceForTopic(
- properties,
- topic,
- partition,
- Collections.unmodifiableSet(
- new HashSet<>(
- getIntegersSequence(
- BrokerRestartingMapper
- .lastSnapshotedElementBeforeShutdown))),
- KAFKA_READ_TIMEOUT);
-
- deleteTestTopic(topic);
- }
-
/** Tests the exactly-once semantic for the simple writes into Kafka. */
@Test
public void testExactlyOnceRegularSink() throws Exception {
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 42d8a61..5c1aa25 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -104,15 +104,11 @@
@BeforeClass
public static void prepare() throws Exception {
- prepare(true);
- }
-
- public static void prepare(boolean hideKafkaBehindProxy) throws Exception {
LOG.info("-------------------------------------------------------------------------");
LOG.info(" Starting KafkaTestBase ");
LOG.info("-------------------------------------------------------------------------");
- startClusters(false, hideKafkaBehindProxy);
+ startClusters(false);
}
@AfterClass
@@ -147,13 +143,11 @@
KafkaTestEnvironment.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS));
}
- public static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy)
- throws Exception {
+ public static void startClusters(boolean secureMode) throws Exception {
startClusters(
KafkaTestEnvironment.createConfig()
.setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
- .setSecureMode(secureMode)
- .setHideKafkaBehindProxy(hideKafkaBehindProxy));
+ .setSecureMode(secureMode));
}
public static void startClusters(KafkaTestEnvironment.Config environmentConfig)
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index c6fc932..4c9269f 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -20,7 +20,6 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
-import org.apache.flink.networking.NetworkFailuresProxy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
@@ -31,7 +30,6 @@
import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -45,7 +43,6 @@
private int kafkaServersNumber = 1;
private Properties kafkaServerProperties = null;
private boolean secureMode = false;
- private boolean hideKafkaBehindProxy = false;
/** Please use {@link KafkaTestEnvironment#createConfig()} method. */
private Config() {}
@@ -77,31 +74,20 @@
return this;
}
- public boolean isHideKafkaBehindProxy() {
- return hideKafkaBehindProxy;
- }
-
public Config setHideKafkaBehindProxy(boolean hideKafkaBehindProxy) {
- this.hideKafkaBehindProxy = hideKafkaBehindProxy;
return this;
}
}
protected static final String KAFKA_HOST = "localhost";
- protected final List<NetworkFailuresProxy> networkFailuresProxies = new ArrayList<>();
-
public static Config createConfig() {
return new Config();
}
public abstract void prepare(Config config) throws Exception;
- public void shutdown() throws Exception {
- for (NetworkFailuresProxy proxy : networkFailuresProxies) {
- proxy.close();
- }
- }
+ public void shutdown() throws Exception {}
public abstract void deleteTestTopic(String topic);
@@ -225,24 +211,6 @@
public abstract boolean isSecureRunSupported();
- public void blockProxyTraffic() {
- for (NetworkFailuresProxy proxy : networkFailuresProxies) {
- proxy.blockTraffic();
- }
- }
-
- public void unblockProxyTraffic() {
- for (NetworkFailuresProxy proxy : networkFailuresProxies) {
- proxy.unblockTraffic();
- }
- }
-
- protected NetworkFailuresProxy createProxy(String remoteHost, int remotePort) {
- NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, remoteHost, remotePort);
- networkFailuresProxies.add(proxy);
- return proxy;
- }
-
protected void maybePrintDanglingThreadStacktrace(String threadNameKeyword) {
for (Map.Entry<Thread, StackTraceElement[]> threadEntry :
Thread.getAllStackTraces().entrySet()) {
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 4a164e5..8297bdd 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -23,7 +23,6 @@
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.networking.NetworkFailuresProxy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
@@ -401,7 +400,6 @@
// ignore
}
}
- super.shutdown();
}
protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
@@ -430,11 +428,6 @@
int kafkaPort = NetUtils.getAvailablePort();
kafkaProperties.put("port", Integer.toString(kafkaPort));
- if (config.isHideKafkaBehindProxy()) {
- NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort);
- kafkaProperties.put("advertised.port", proxy.getLocalPort());
- }
-
// to support secure kafka cluster
if (config.isSecureMode()) {
LOG.info("Adding Kafka secure configurations");