[FLINK-31772][Connector/Kinesis] Adjusting Kinesis Ratelimiting strategy to fix performance regression (#70)
diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
index 2ff49cb..58d3d8a 100644
--- a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
+++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
@@ -26,6 +26,9 @@
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -79,6 +82,9 @@
RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER,
getSdkClientMisconfiguredExceptionClassifier());
+ private static final int AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE = 10;
+ private static final double AIMD_RATE_LIMITING_STRATEGY_DECREASE_FACTOR = 0.99D;
+
private final Counter numRecordsOutErrorsCounter;
/* Name of the stream in Kinesis Data Streams */
@@ -152,6 +158,8 @@
.setMaxBufferedRequests(maxBufferedRequests)
.setMaxTimeInBufferMS(maxTimeInBufferMS)
.setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+ .setRateLimitingStrategy(
+ buildRateLimitingStrategy(maxInFlightRequests, maxBatchSize))
.build(),
states);
this.failOnError = failOnError;
@@ -175,6 +183,19 @@
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
}
+ private static RateLimitingStrategy buildRateLimitingStrategy(
+ int maxInFlightRequests, int maxBatchSize) {
+ return CongestionControlRateLimitingStrategy.builder()
+ .setMaxInFlightRequests(maxInFlightRequests)
+ .setInitialMaxInFlightMessages(maxBatchSize)
+ .setScalingStrategy(
+ AIMDScalingStrategy.builder(maxBatchSize * maxInFlightRequests)
+ .setIncreaseRate(AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE)
+ .setDecreaseFactor(AIMD_RATE_LIMITING_STRATEGY_DECREASE_FACTOR)
+ .build())
+ .build();
+ }
+
@Override
protected void submitRequestEntries(
List<PutRecordsRequestEntry> requestEntries,
diff --git a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java
new file mode 100644
index 0000000..eccfe0a
--- /dev/null
+++ b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+
+import java.util.Properties;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test class for {@link KinesisStreamsSinkWriter}. */
+public class KinesisStreamsSinkWriterTest {
+
+ private static final int EXPECTED_AIMD_INC_RATE = 10;
+ private static final double EXPECTED_AIMD_DEC_FACTOR = 0.99D;
+ private static final int MAX_BATCH_SIZE = 50;
+ private static final int MAX_INFLIGHT_REQUESTS = 16;
+ private static final int MAX_BUFFERED_REQUESTS = 10000;
+ private static final long MAX_BATCH_SIZE_IN_BYTES = 4 * 1024 * 1024;
+ private static final long MAX_TIME_IN_BUFFER = 5000;
+ private static final long MAX_RECORD_SIZE = 1000 * 1024;
+ private static final boolean FAIL_ON_ERROR = false;
+
+ private KinesisStreamsSinkWriter<String> sinkWriter;
+
+ private static final ElementConverter<String, PutRecordsRequestEntry>
+ ELEMENT_CONVERTER_PLACEHOLDER =
+ KinesisStreamsSinkElementConverter.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
+ .build();
+
+ @Test
+ void testCreateKinesisStreamsSinkWriterInitializesRateLimitingStrategyWithExpectedParameters() {
+ Sink.InitContext sinkInitContext = new TestSinkInitContext();
+ Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint");
+ sinkWriter =
+ new KinesisStreamsSinkWriter<String>(
+ ELEMENT_CONVERTER_PLACEHOLDER,
+ sinkInitContext,
+ MAX_BATCH_SIZE,
+ MAX_INFLIGHT_REQUESTS,
+ MAX_BUFFERED_REQUESTS,
+ MAX_BATCH_SIZE_IN_BYTES,
+ MAX_TIME_IN_BUFFER,
+ MAX_RECORD_SIZE,
+ FAIL_ON_ERROR,
+ "streamName",
+ "StreamARN",
+ sinkProperties);
+
+ assertThat(sinkWriter)
+ .extracting("rateLimitingStrategy")
+ .isInstanceOf(CongestionControlRateLimitingStrategy.class);
+
+ assertThat(sinkWriter)
+ .extracting("rateLimitingStrategy")
+ .extracting("scalingStrategy")
+ .isInstanceOf(AIMDScalingStrategy.class);
+
+ assertThat(sinkWriter)
+ .extracting("rateLimitingStrategy")
+ .extracting("scalingStrategy")
+ .extracting("increaseRate")
+ .isEqualTo(EXPECTED_AIMD_INC_RATE);
+
+ assertThat(sinkWriter)
+ .extracting("rateLimitingStrategy")
+ .extracting("scalingStrategy")
+ .extracting("decreaseFactor")
+ .isEqualTo(EXPECTED_AIMD_DEC_FACTOR);
+ }
+}