[FLINK-36319][Connector/Prometheus] Multiple fixes.
Modified the default behavior for non-retriable errors to "discard and continue". Blocked the ability to set non-retriable error handling behavior to "fail".
Removed ability to pass a custom AWS credential provider to the request signer, causing a serialization exception.
Fixed bug in sink builder when retry configuration was not configured.
Refactored parameter validation to make it more robust
diff --git a/flink-connector-prometheus-request-signer-amp/README.md b/flink-connector-prometheus-request-signer-amp/README.md
index ede09a1..75808df 100644
--- a/flink-connector-prometheus-request-signer-amp/README.md
+++ b/flink-connector-prometheus-request-signer-amp/README.md
@@ -1,10 +1,9 @@
## Request Signer for Amazon Managed Prometheus (AMP)
-Request signer implementation for Amazon Managed Prometheus (AMP)
+Request signer implementation for Amazon Managed Prometheus (AMP).
-The signer retrieves AWS credential using a `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`.
-The default `com.amazonaws.auth.DefaultAWSCredentialsProviderChain` works in most of the cases.
-Alternatively, you can pass an instance of `AwsCredentialsProvider` to the signer constructor.
+The signer retrieves AWS credential using the default credential provider chain, that searches for credentials
+in the following order: ENV_VARS, SYS_PROPS, WEB_IDENTITY_TOKEN, PROFILE and EC2/ECS credentials provider.
The Flink application requires `RemoteWrite` permissions to the AMP workspace (e.g. `AmazonPromethusRemoteWriteAccess`
policy).
diff --git a/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java b/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java
index 55ba7ae..5922b0e 100644
--- a/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java
+++ b/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.prometheus.sink.aws;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
import org.apache.flink.util.Preconditions;
@@ -46,32 +47,10 @@
private final URL remoteWriteUrl;
private final String awsRegion;
- private final AwsCredentialsProvider awsCredProvider;
-
- /**
- * Creates a signer instance with a specified credential provider.
- *
- * @param remoteWriteUrl URL of the remote-write endpoint
- * @param awsRegion Region of the AMP workspace
- * @param awsCredProvider implementation of AwsCredentialsProvider to retrieve the credentials
- */
- public AmazonManagedPrometheusWriteRequestSigner(
- String remoteWriteUrl, String awsRegion, AwsCredentialsProvider awsCredProvider) {
- Preconditions.checkArgument(
- StringUtils.isNotBlank(awsRegion), "awsRegion cannot be null or empty");
- Preconditions.checkArgument(
- StringUtils.isNotBlank(remoteWriteUrl), "remoteWriteUrl cannot be null or empty");
- Preconditions.checkArgument(awsCredProvider != null, "credentialsProvider cannot be null");
-
- this.awsRegion = awsRegion;
- try {
- this.remoteWriteUrl = new URL(remoteWriteUrl);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(
- "Invalid AMP remote-write URL: " + remoteWriteUrl, e);
- }
- this.awsCredProvider = awsCredProvider;
- }
+ // The credential provider cannot be created in the constructor or passed as parameter, because
+ // it is not serializable. Flink would fail serializing the sink instance when initializing the
+ // job.
+ private transient AwsCredentialsProvider credentialsProvider;
/**
* Creates a signer instance using the default AWS credentials provider chain.
@@ -80,7 +59,43 @@
* @param awsRegion Region of the AMP workspace
*/
public AmazonManagedPrometheusWriteRequestSigner(String remoteWriteUrl, String awsRegion) {
- this(remoteWriteUrl, awsRegion, DefaultCredentialsProvider.create());
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(awsRegion), "awsRegion cannot be null or empty");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteWriteUrl), "remoteWriteUrl cannot be null or empty");
+
+ this.awsRegion = awsRegion;
+ try {
+ this.remoteWriteUrl = new URL(remoteWriteUrl);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(
+ "Invalid AMP remote-write URL: " + remoteWriteUrl, e);
+ }
+ }
+
+ /**
+ * Setting the credential provider explicitly is exposed, at package level only, for testing
+ * signature generaiton with different types of credentials. In the actual application, the
+ * credential provider must be initialized lazily, because AwsCredentialsProvider
+ * implementations are not serializable.
+ *
+ * @param credentialsProvider an instance of AwsCredentialsProvider
+ */
+ @VisibleForTesting
+ void setCredentialsProvider(AwsCredentialsProvider credentialsProvider) {
+ this.credentialsProvider = credentialsProvider;
+ }
+
+ /**
+ * Initialize the credentials provider lazily.
+ *
+ * @return an instance of DefaultCredentialsProvider.
+ */
+ private AwsCredentialsProvider getCredentialsProvider() {
+ if (credentialsProvider == null) {
+ credentialsProvider = DefaultCredentialsProvider.create();
+ }
+ return credentialsProvider;
}
/**
@@ -100,7 +115,7 @@
requestHeaders.put(X_AMZ_CONTENT_SHA_256, contentHashString);
// Get the credentials from the default credential provider chain
- AwsCredentials awsCreds = awsCredProvider.resolveCredentials();
+ AwsCredentials awsCreds = getCredentialsProvider().resolveCredentials();
// If the credentials are from a session, also get the session token
String sessionToken =
diff --git a/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerSerializationTest.java b/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerSerializationTest.java
new file mode 100644
index 0000000..ce15e01
--- /dev/null
+++ b/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerSerializationTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.aws;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.util.InstantiationUtil.isSerializable;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AmazonManagedPrometheusWriteRequestSignerSerializationTest {
+
+ @Test
+ void shouldBeActuallySerializable() {
+ AmazonManagedPrometheusWriteRequestSigner signer =
+ new AmazonManagedPrometheusWriteRequestSigner(
+ "http://example.com/endpoint", "us-east-1");
+ assertTrue(isSerializable(signer), "The request signer should be serializable");
+ }
+}
diff --git a/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerTest.java b/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerTest.java
index 67ccccf..fd768e6 100644
--- a/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerTest.java
+++ b/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerTest.java
@@ -88,9 +88,9 @@
public void shouldAddExpectedHeaders_BasicCredentials() {
AmazonManagedPrometheusWriteRequestSigner signer =
new AmazonManagedPrometheusWriteRequestSigner(
- "https://example.com/endpoint",
- "us-east-1",
- new DummyAwsBasicCredentialsProvider("access-key-id", "secret-access-key"));
+ "https://example.com/endpoint", "us-east-1");
+ signer.setCredentialsProvider(
+ new DummyAwsBasicCredentialsProvider("access-key-id", "secret-access-key"));
Map<String, String> requestHeaders = new HashMap<>();
byte[] requestBody = "request-payload".getBytes(StandardCharsets.UTF_8);
@@ -109,10 +109,10 @@
public void shouldAddExpectedHeaders_SessionCredentials() {
AmazonManagedPrometheusWriteRequestSigner signer =
new AmazonManagedPrometheusWriteRequestSigner(
- "https://example.com/endpoint",
- "us-east-1",
- new DummAwsSessionCredentialProvider(
- "access-key-id", "secret-access-key", "session-key"));
+ "https://example.com/endpoint", "us-east-1");
+ signer.setCredentialsProvider(
+ new DummAwsSessionCredentialProvider(
+ "access-key-id", "secret-access-key", "session-key"));
Map<String, String> requestHeaders = new HashMap<>();
byte[] requestBody = "request-payload".getBytes(StandardCharsets.UTF_8);
diff --git a/flink-connector-prometheus/README.md b/flink-connector-prometheus/README.md
index 0343a1c..07f2308 100644
--- a/flink-connector-prometheus/README.md
+++ b/flink-connector-prometheus/README.md
@@ -13,11 +13,8 @@
For efficiency, the connector does not do any validation.
If input is out of order or malformed, **the write request is rejected by Prometheus**.
-If the write request is rejected, depending on the configured [error handling behaviour](#error-handling-behavior) for
-"Prometheus non-retriable errors", the sink will either throw an exception (`FAIL`, default behavior) or discard the
-offending
-request and continue (`DISCARD_AND_CONTINUE`). See [error handling behaviour](#error-handling-behavior), below, for
-further details.
+Currently, on such errors, the connector will discard the entire request containing the offending data, and continue.
+See [error handling behavior](#error-handling-behavior), below, for further details.
The sink receives as input time-series, each containing one or more samples.
To optimise the write throughput, input time-series are batched, in the order they are received, and written with a
@@ -25,9 +22,7 @@
If a write-request contains any out-of-order or malformed data, **the entire request is rejected** and all time series
are discarded.
-The reason is Remote-Write
-specifications [explicitly forbids retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff) of
-rejected write requests (4xx responses).
+The reason is Remote-Write specifications [explicitly forbids retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff) of rejected write requests (4xx responses).
and the Prometheus response does not contain enough information to efficiently partially retry the write, discarding the
offending data.
@@ -67,11 +62,8 @@
.addLabel("InstanceID", instanceId)
.addLabel("AccountID", accountId);
-for(
-Tuple2<Double, Long> sample :samples){
- tsBuilder.
-
-addSample(sample.f0, sample.f1);
+for(Tuple2<Double, Long> sample :samples){
+ tsBuilder.addSample(sample.f0, sample.f1);
}
PrometheusTimeSeries ts = tsBuilder.build();
@@ -98,21 +90,20 @@
The sink batches multiple time-series into a single write-request, retaining the order..
Batching is based on the number of samples. Each write-request contains up to 500 samples, with a max buffering time of
-5 seconds
-(both configurable). The number of time-series doesn't matter.
+5 seconds (both configurable). The number of time-series doesn't matter.
-As by [Prometheus Remote-Write specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff),
-the sink retries 5xx and 429 responses. Retrying is blocking, to retain sample ordering, and uses and exponential
-backoff.
+As by [Prometheus Remote-Write specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff), the sink retries 5xx and 429 responses. Retrying is blocking, to
+retain sample ordering, and uses and exponential backoff.
The exponential backoff starts with an initial delay (default 30 ms) and increases it exponentially up to a max retry
delay (default 5 sec). It continues retrying until the max number of retries is reached (default reties forever).
-On non-retriable error response (4xx, except 429, non retryable exceptions), or on reaching the retry limit, depending
-on
-the configured [error handling behaviour](#error-handling-behavior) for "Max retries exceeded", the sink will either
-throw
-an exception (`FAIL`, default behaviour), or **discard the entire write-request**, log a warning and continue. See
+On non-retriable error response (4xx, except 429, non retryable exceptions) the sink will always discard and continue
+(`DISCARD_AND_CONTINUE` behavior; see details below).
+
+On reaching the retry limit, depending
+on the configured [error handling behaviour](#error-handling-behavior) for "Max retries exceeded", the sink will either
+throw an exception (`FAIL`, default behaviour), or **discard the entire write-request**, log a warning and continue. See
[error handling behaviour](#error-handling-behavior), below, for further details.
### Initializing the sink
@@ -134,8 +125,10 @@
.setPrometheusRemoteWriteUrl(prometheusRemoteWriteUrl) // Remote-write URL
.setRequestSigner(new AmazonManagedPrometheusWriteRequestSigner(prometheusRemoteWriteUrl, prometheusRegion)) // Optional request signed (AMP request signer in this example)
.setErrorHandlingBehaviourConfiguration(SinkWriterErrorHandlingBehaviorConfiguration.builder()
- // Error handling behaviors. See description below, for more details. All behaviors default to FAIL
- .onPrometheusNonRetriableError(OnErrorBehavior.FAIL)
+ // Error handling behaviors. See description below, for more details.
+ // Default is DISCARD_AND_CONTINUE for non-retriable errors
+ .onPrometheusNonRetriableError(OnErrorBehavior.DISCARD_AND_CONTINUE)
+ // Default is FAIL for other error types
.onMaxRetryExceeded(OnErrorBehavior.FAIL)
.onHttpClientIOFail(OnErrorBehavior.FAIL)
.build())
@@ -183,17 +176,17 @@
The behaviour of the sink, when an unrecoverable error happens while writing to Prometheus remote-write endpoint, is
configurable.
+
+The possible behaviors are:
+
+* `FAIL`: throw a `PrometheusSinkWriteException`, causing the job to fail.
+* `DISCARD_AND_CONTINUE`: log the reason of the error, discard the offending request, and continue.
+
There are 3 error conditions:
-1. Prometheus returns a non-retriable error response (i.e. any `4xx` status code except `429`)
-2. Prometheus returns a retriable error response (i.e. `5xx` or `429`) but the max retry limit is exceeded
-3. The http client fails to complete the request, for an I/O error
-
-The default behavior of the sink, for all these three error conditions, is to `FAIL`: throw
-a `PrometheusSinkWriteException`, causing the job to fail.
-
-Optionally, for each of these error conditions independently, the sink can be configured to `DISCARD_AND_CONTINUE`: "
-log, discard the offending request and continue".
+1. Prometheus returns a non-retriable error response (i.e. any `4xx` status code except `429`). Default: `DISCARD_AND_CONTINUE`.
+2. Prometheus returns a retriable error response (i.e. `5xx` or `429`) but the max retry limit is exceeded. Default: `FAIL`.
+3. The http client fails to complete the request, for an I/O error. Default: `FAIL`.
The error handling behaviors can be configured when creating the instance of the sink, as shown in this snipped:
@@ -219,6 +212,12 @@
request are causing the problem.
Prometheus does not return sufficient information to automatically handle partial requests.
+> In the current connector version, `DISCARD_AND_CONTINUE` is the only supported behavior for non-retriable error.
+>
+> The behavior cannot be set to `FAIL`. Failing on non-retriable error would make impossible for the application to
+> restart from checkpoint. The reason is that restarting from checkpoint cause some duplicates, that are rejected by
+> Prometheus as out of order, causing in turn another non-retriable error, in an endless loop.
+
### Metrics
The sink exposes custom metrics, counting the samples and write-requests (batches) successfully written or discarded.
diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
index 52a10ba..7a7738f 100644
--- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
+++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
@@ -28,8 +28,11 @@
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.Preconditions;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.Collection;
/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
@@ -73,8 +76,22 @@
maxRecordSizeInSamples // maxRecordSizeInBytes
);
- Preconditions.checkArgument(maxInFlightRequests == 1, "maxInFlightRequests must be 1");
-
+ Preconditions.checkArgument(
+ maxBatchSizeInSamples > 1, "Max batch size (in samples) must be positive");
+ Preconditions.checkArgument(
+ maxRecordSizeInSamples <= maxBatchSizeInSamples,
+ "Maz record size (in samples) must be <= Max batch size");
+ Preconditions.checkArgument(maxInFlightRequests == 1, "Max in-flight requests must be 1");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(prometheusRemoteWriteUrl),
+ "Missing or blank Prometheus Remote-Write URL");
+ checkValidRemoteWriteUrl(prometheusRemoteWriteUrl);
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(httpUserAgent), "Missing HTTP User Agent string");
+ Preconditions.checkNotNull(
+ errorHandlingBehaviorConfig, "Missing error handling configuration");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(metricGroupName), "Missing metric group name");
this.maxBatchSizeInSamples = maxBatchSizeInSamples;
this.requestSigner = requestSigner;
this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
@@ -147,4 +164,12 @@
getWriterStateSerializer() {
return new PrometheusStateSerializer();
}
+
+ private static void checkValidRemoteWriteUrl(String url) {
+ try {
+ new URL(url);
+ } catch (MalformedURLException mue) {
+ throw new IllegalArgumentException("Invalid Remote-Write URL: " + url, mue);
+ }
+ }
}
diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
index 69eb72c..02b1e77 100644
--- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
+++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
@@ -22,14 +22,10 @@
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
import org.apache.flink.connector.prometheus.sink.prometheus.Types;
-import org.apache.flink.util.Preconditions;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.util.Optional;
/** Builder for Sink implementation. */
@@ -74,7 +70,7 @@
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS);
int actualMaxRecordSizeInSamples =
- Optional.ofNullable(maxRecordSizeInSamples).orElse(maxBatchSizeInSamples);
+ Optional.ofNullable(maxRecordSizeInSamples).orElse(actualMaxBatchSizeInSamples);
int actualSocketTimeoutMs =
Optional.ofNullable(socketTimeoutMs)
@@ -92,25 +88,19 @@
.SinkWriterErrorHandlingBehaviorConfiguration
.DEFAULT_BEHAVIORS);
+ PrometheusSinkConfiguration.RetryConfiguration actualRetryConfiguration =
+ Optional.ofNullable(retryConfiguration)
+ .orElse(
+ PrometheusSinkConfiguration.RetryConfiguration
+ .DEFAULT_RETRY_CONFIGURATION);
+
String actualMetricGroupName =
Optional.ofNullable(metricGroupName).orElse(DEFAULT_METRIC_GROUP_NAME);
-
- Preconditions.checkArgument(
- StringUtils.isNotBlank(prometheusRemoteWriteUrl),
- "Missing or blank Prometheus Remote-Write URL");
- checkValidRemoteWriteUrl(prometheusRemoteWriteUrl);
- Preconditions.checkNotNull(retryConfiguration, "Missing retry configuration");
- Preconditions.checkArgument(
- actualMaxBatchSizeInSamples > 0, "Max batch size (in samples) must be positive");
- Preconditions.checkArgument(
- actualMaxRecordSizeInSamples <= actualMaxBatchSizeInSamples,
- "Max record size (in samples) must be <= Max batch size");
-
LOG.info(
"Prometheus sink configuration:"
+ "\n\t\tmaxBatchSizeInSamples={}\n\t\tmaxRecordSizeInSamples={}"
+ "\n\t\tmaxTimeInBufferMs={}\n\t\tmaxInFlightRequests={}\n\t\tmaxBufferedRequests={}"
- + "\n\t\tinitialRetryDelayMs={}\n\t\tmaxRetryDelayMs={}\n\t\tmaxRetryCount={}"
+ + "\n\t\tRetryConfiguration: initialRetryDelayMs={}, maxRetryDelayMs={}, maxRetryCount={}"
+ "\n\t\tsocketTimeoutMs={}\n\t\thttpUserAgent={}"
+ "\n\t\tErrorHandlingBehaviour: onMaxRetryExceeded={}, onHttpClientIOFailure={}, onNonRetriableError={}",
actualMaxBatchSizeInSamples,
@@ -118,9 +108,9 @@
actualMaxTimeInBufferMS,
MAX_IN_FLIGHT_REQUESTS,
actualMaxBufferedRequests,
- retryConfiguration.getInitialRetryDelayMS(),
- retryConfiguration.getMaxRetryDelayMS(),
- retryConfiguration.getMaxRetryCount(),
+ actualRetryConfiguration.getInitialRetryDelayMS(),
+ actualRetryConfiguration.getMaxRetryDelayMS(),
+ actualRetryConfiguration.getMaxRetryCount(),
socketTimeoutMs,
actualHttpUserAgent,
actualErrorHandlingBehaviorConfig.getOnMaxRetryExceeded(),
@@ -135,7 +125,7 @@
actualMaxRecordSizeInSamples,
actualMaxTimeInBufferMS,
prometheusRemoteWriteUrl,
- new PrometheusAsyncHttpClientBuilder(retryConfiguration)
+ new PrometheusAsyncHttpClientBuilder(actualRetryConfiguration)
.setSocketTimeout(actualSocketTimeoutMs),
requestSigner,
actualHttpUserAgent,
@@ -143,14 +133,6 @@
actualMetricGroupName);
}
- private static void checkValidRemoteWriteUrl(String url) {
- try {
- new URL(url);
- } catch (MalformedURLException mue) {
- throw new IllegalArgumentException("Invalid Remote-Write URL: " + url, mue);
- }
- }
-
public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String prometheusRemoteWriteUrl) {
this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
return this;
diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
index b939d09..2c7dd34 100644
--- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
+++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
@@ -18,10 +18,12 @@
package org.apache.flink.connector.prometheus.sink;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.Optional;
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.OnErrorBehavior.DISCARD_AND_CONTINUE;
import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.OnErrorBehavior.FAIL;
/** This class contains configuration classes for different components of the Prometheus sink. */
@@ -46,7 +48,7 @@
public static final OnErrorBehavior ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR = FAIL;
public static final OnErrorBehavior ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR = FAIL;
public static final OnErrorBehavior ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR =
- FAIL;
+ DISCARD_AND_CONTINUE;
/** Behaviour when the max retries is exceeded on Prometheus retriable errors. */
private final OnErrorBehavior onMaxRetryExceeded;
@@ -61,6 +63,13 @@
OnErrorBehavior onMaxRetryExceeded,
OnErrorBehavior onHttpClientIOFail,
OnErrorBehavior onPrometheusNonRetriableError) {
+ // onPrometheusNonRetriableError cannot be set to FAIL, because it makes impossible for
+ // the job to restart from checkpoint (see FLINK-36319).
+ // We are retaining the possibility of configuring the behavior on this type of error to
+ // allow implementing a different type of behavior.
+ Preconditions.checkArgument(
+ onPrometheusNonRetriableError == DISCARD_AND_CONTINUE,
+ "Only DISCARD_AND_CONTINUE is currently supported for onPrometheusNonRetriableError");
this.onMaxRetryExceeded = onMaxRetryExceeded;
this.onHttpClientIOFail = onHttpClientIOFail;
this.onPrometheusNonRetriableError = onPrometheusNonRetriableError;
@@ -178,5 +187,8 @@
return new RetryConfiguration(initialRetryDelayMS, maxRetryDelayMS, maxRetryCount);
}
}
+
+ public static final RetryConfiguration DEFAULT_RETRY_CONFIGURATION =
+ new RetryConfiguration.Builder().build();
}
}
diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
index 1a4ec3e..29c504e 100644
--- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
+++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
@@ -59,6 +59,7 @@
int actualSocketTimeoutMs =
Optional.ofNullable(socketTimeoutMs).orElse(DEFAULT_SOCKET_TIMEOUT_MS);
+ Preconditions.checkNotNull(retryConfiguration, "Missing retry configuration");
Preconditions.checkArgument(
retryConfiguration.getInitialRetryDelayMS() >= 0,
"Initial retry delay must be >= 0");
diff --git a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
index 758f488..4b98d63 100644
--- a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
+++ b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
@@ -30,6 +30,9 @@
import java.util.List;
import java.util.function.Consumer;
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR;
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR;
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -57,12 +60,10 @@
errorHandlingBehavior =
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
.builder()
- .onMaxRetryExceeded(
- PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
- .onHttpClientIOFail(
- PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
+ .onMaxRetryExceeded(ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR)
+ .onHttpClientIOFail(ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR)
.onPrometheusNonRetriableError(
- PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
+ ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR)
.build();
HttpResponseCallback callback =
@@ -85,33 +86,6 @@
}
@Test
- void shouldThrowExceptionOnCompletedWith404WhenFailOnNonRetriableIsSelected() {
- PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehavior =
- PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- .builder()
- .onPrometheusNonRetriableError(
- PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
- .build();
-
- HttpResponseCallback callback =
- new HttpResponseCallback(
- TIME_SERIES_COUNT,
- SAMPLE_COUNT,
- metricsCallback,
- errorHandlingBehavior,
- requestResults);
-
- SimpleHttpResponse httpResponse = new SimpleHttpResponse(HttpStatus.SC_NOT_FOUND);
-
- assertThrows(
- PrometheusSinkWriteException.class,
- () -> {
- callback.completed(httpResponse);
- });
- }
-
- @Test
void shouldIncFailCountersOnCompletedWith404WhenDiscardAndContinueOnNonRetriableIsSelected() {
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
errorHandlingBehavior =
diff --git a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
index f3bddce..9fc4cfa 100644
--- a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
+++ b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
@@ -45,6 +45,9 @@
import static org.apache.flink.connector.prometheus.sink.HttpResponseCallbackTestUtils.assertCallbackCompletedOnceWithException;
import static org.apache.flink.connector.prometheus.sink.HttpResponseCallbackTestUtils.assertCallbackCompletedOnceWithNoException;
import static org.apache.flink.connector.prometheus.sink.HttpResponseCallbackTestUtils.getRequestResult;
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR;
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR;
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR;
import static org.awaitility.Awaitility.await;
/**
@@ -111,17 +114,15 @@
int statusCode = 200;
serverWillRespond(status(statusCode));
- // Fail on any error
+ // Default behaviors for all errors
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
errorHandlingBehavior =
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
.builder()
- .onMaxRetryExceeded(
- PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
- .onHttpClientIOFail(
- PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
+ .onMaxRetryExceeded(ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR)
+ .onHttpClientIOFail(ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR)
.onPrometheusNonRetriableError(
- PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
+ ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR)
.build();
VerifyableResponseCallback callback =
@@ -266,44 +267,6 @@
}
@Test
- void shouldNotRetryAndCompleteAndThrowExceptionOn404WhenFailOnNonRetriableIsSelected(
- WireMockRuntimeInfo wmRuntimeInfo) throws URISyntaxException, IOException {
- PrometheusAsyncHttpClientBuilder clientBuilder = getHttpClientBuilder(1);
-
- // 404,Not found is non-retriable for Prometheus remote-write
- int statusCode = 404;
- serverWillRespond(status(statusCode));
-
- // Fail on non-retriable
- PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehavior =
- PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- .builder()
- .onPrometheusNonRetriableError(
- PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
- .build();
-
- VerifyableResponseCallback callback =
- getResponseCallback(metricsCallback, errorHandlingBehavior);
-
- SimpleHttpRequest request = buildRequest(wmRuntimeInfo);
-
- try (CloseableHttpAsyncClient client = clientBuilder.buildAndStartClient(metricsCallback)) {
- client.execute(request, callback);
-
- await().untilAsserted(
- () -> {
- // Check the client execute only one request
- verify(exactly(1), postRequestedFor(urlEqualTo("/remote_write")));
-
- // Verify the callback was completed once with an exception
- assertCallbackCompletedOnceWithException(
- PrometheusSinkWriteException.class, callback);
- });
- }
- }
-
- @Test
void shouldNotRetryCompleteAndThrowExceptionOn304(WireMockRuntimeInfo wmRuntimeInfo)
throws URISyntaxException, IOException {
diff --git a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java
new file mode 100644
index 0000000..f0639a0
--- /dev/null
+++ b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class PrometheusSinkBuilderTest {
+ private static final String ENDPOINT = "http://example.com:123/something";
+
+ @Test
+ void shouldBuildSinkOnlyProvidingPrometheusRemoteWriteUrl() {
+ PrometheusSink sink =
+ (PrometheusSink)
+ new PrometheusSinkBuilder().setPrometheusRemoteWriteUrl(ENDPOINT).build();
+ assertNotNull(sink);
+ }
+
+ @Test
+ void shouldBuildSinkProvidingAllFields() {
+ PrometheusSink sink =
+ (PrometheusSink)
+ new PrometheusSinkBuilder()
+ .setPrometheusRemoteWriteUrl(ENDPOINT)
+ .setMaxBatchSizeInSamples(500)
+ .setMaxRecordSizeInSamples(500)
+ .setMaxTimeInBufferMS(5000)
+ .setRetryConfiguration(
+ PrometheusSinkConfiguration.RetryConfiguration
+ .DEFAULT_RETRY_CONFIGURATION)
+ .setSocketTimeoutMs(1000)
+ .setRequestSigner(new DummyPrometheusRequestSigner())
+ .setHttpUserAgent("test")
+ .setErrorHandlingBehaviourConfiguration(
+ PrometheusSinkConfiguration
+ .SinkWriterErrorHandlingBehaviorConfiguration
+ .builder()
+ .onHttpClientIOFail(
+ PrometheusSinkConfiguration.OnErrorBehavior
+ .FAIL)
+ .onMaxRetryExceeded(
+ PrometheusSinkConfiguration.OnErrorBehavior
+ .FAIL)
+ .onPrometheusNonRetriableError(
+ PrometheusSinkConfiguration.OnErrorBehavior
+ .DISCARD_AND_CONTINUE)
+ .build())
+ .setMetricGroupName("test")
+ .build();
+ assertNotNull(sink);
+ }
+
+ @Test
+ void shouldFailIfPrometheusRemoteWriteUrlIsMissing() {
+ assertThrows(IllegalArgumentException.class, () -> new PrometheusSinkBuilder().build());
+ }
+
+ @Test
+ void shouldFailIfPrometheusRemoteWriteUrlIsInvalid() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> new PrometheusSinkBuilder().setPrometheusRemoteWriteUrl("invalid").build());
+ }
+}
diff --git a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkSerializationTest.java b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkSerializationTest.java
new file mode 100644
index 0000000..a7bb6ef
--- /dev/null
+++ b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkSerializationTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.util.InstantiationUtil.isSerializable;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PrometheusSinkSerializationTest {
+
+ @Test
+ void shouldBeActuallySerializable() {
+ PrometheusSink sink =
+ (PrometheusSink)
+ PrometheusSink.builder()
+ .setPrometheusRemoteWriteUrl("http://example.com/endpoint")
+ .build();
+
+ assertTrue(isSerializable(sink), "The sink object should be serializable");
+ }
+}
diff --git a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
index be88e5c..ef644f7 100644
--- a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
+++ b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
@@ -20,6 +20,7 @@
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
class SinkWriterErrorHandlingBehaviorConfigurationTest {
@@ -44,9 +45,21 @@
}
@Test
- public void shouldDefaultToFailOnPrometheusNonRetriableError() {
+ public void shouldDefaultToDiscardAndContinueOnPrometheusNonRetriableError() {
assertEquals(
- PrometheusSinkConfiguration.OnErrorBehavior.FAIL,
+ PrometheusSinkConfiguration.OnErrorBehavior.DISCARD_AND_CONTINUE,
DEFAULT_CONFIG.getOnPrometheusNonRetriableError());
}
+
+ @Test
+ public void shouldPreventSettingContinueOnPrometheusNonRetriableErrorToFail() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
+ .builder()
+ .onPrometheusNonRetriableError(
+ PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
+ .build());
+ }
}