[FLINK-36319][Connector/Prometheus] Multiple fixes.
Modified the default behavior for non-retriable errors to "discard and continue". Blocked the ability to set non-retriable error handling behavior to "fail".
Removed ability to pass a custom AWS credential provider to the request signer, causing a serialization exception.
Fixed bug in sink builder when retry configuration was not configured.
Refactored parameter validation to make it more robust
diff --git a/flink-connector-prometheus-request-signer-amp/README.md b/flink-connector-prometheus-request-signer-amp/README.md
index ede09a1..75808df 100644
--- a/flink-connector-prometheus-request-signer-amp/README.md
+++ b/flink-connector-prometheus-request-signer-amp/README.md
@@ -1,10 +1,9 @@
 ## Request Signer for Amazon Managed Prometheus (AMP)
 
-Request signer implementation for Amazon Managed Prometheus (AMP)
+Request signer implementation for Amazon Managed Prometheus (AMP).
 
-The signer retrieves AWS credential using a `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`.
-The default `com.amazonaws.auth.DefaultAWSCredentialsProviderChain` works in most of the cases. 
-Alternatively, you can pass an instance of `AwsCredentialsProvider` to the signer constructor.
+The signer retrieves AWS credential using the default credential provider chain, that searches for credentials 
+in the following order: ENV_VARS, SYS_PROPS, WEB_IDENTITY_TOKEN, PROFILE and EC2/ECS credentials provider.
 
 The Flink application requires `RemoteWrite` permissions to the AMP workspace (e.g. `AmazonPromethusRemoteWriteAccess`
 policy).
diff --git a/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java b/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java
index 55ba7ae..5922b0e 100644
--- a/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java
+++ b/flink-connector-prometheus-request-signer-amp/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java
@@ -18,6 +18,7 @@
 package org.apache.flink.connector.prometheus.sink.aws;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
 import org.apache.flink.util.Preconditions;
 
@@ -46,32 +47,10 @@
     private final URL remoteWriteUrl;
     private final String awsRegion;
 
-    private final AwsCredentialsProvider awsCredProvider;
-
-    /**
-     * Creates a signer instance with a specified credential provider.
-     *
-     * @param remoteWriteUrl URL of the remote-write endpoint
-     * @param awsRegion Region of the AMP workspace
-     * @param awsCredProvider implementation of AwsCredentialsProvider to retrieve the credentials
-     */
-    public AmazonManagedPrometheusWriteRequestSigner(
-            String remoteWriteUrl, String awsRegion, AwsCredentialsProvider awsCredProvider) {
-        Preconditions.checkArgument(
-                StringUtils.isNotBlank(awsRegion), "awsRegion cannot be null or empty");
-        Preconditions.checkArgument(
-                StringUtils.isNotBlank(remoteWriteUrl), "remoteWriteUrl cannot be null or empty");
-        Preconditions.checkArgument(awsCredProvider != null, "credentialsProvider cannot be null");
-
-        this.awsRegion = awsRegion;
-        try {
-            this.remoteWriteUrl = new URL(remoteWriteUrl);
-        } catch (MalformedURLException e) {
-            throw new IllegalArgumentException(
-                    "Invalid AMP remote-write URL: " + remoteWriteUrl, e);
-        }
-        this.awsCredProvider = awsCredProvider;
-    }
+    // The credential provider cannot be created in the constructor or passed as parameter, because
+    // it is not serializable. Flink would fail serializing the sink instance when initializing the
+    // job.
+    private transient AwsCredentialsProvider credentialsProvider;
 
     /**
      * Creates a signer instance using the default AWS credentials provider chain.
@@ -80,7 +59,43 @@
      * @param awsRegion Region of the AMP workspace
      */
     public AmazonManagedPrometheusWriteRequestSigner(String remoteWriteUrl, String awsRegion) {
-        this(remoteWriteUrl, awsRegion, DefaultCredentialsProvider.create());
+        Preconditions.checkArgument(
+                StringUtils.isNotBlank(awsRegion), "awsRegion cannot be null or empty");
+        Preconditions.checkArgument(
+                StringUtils.isNotBlank(remoteWriteUrl), "remoteWriteUrl cannot be null or empty");
+
+        this.awsRegion = awsRegion;
+        try {
+            this.remoteWriteUrl = new URL(remoteWriteUrl);
+        } catch (MalformedURLException e) {
+            throw new IllegalArgumentException(
+                    "Invalid AMP remote-write URL: " + remoteWriteUrl, e);
+        }
+    }
+
+    /**
+     * Setting the credential provider explicitly is exposed, at package level only, for testing
+     * signature generaiton with different types of credentials. In the actual application, the
+     * credential provider must be initialized lazily, because AwsCredentialsProvider
+     * implementations are not serializable.
+     *
+     * @param credentialsProvider an instance of AwsCredentialsProvider
+     */
+    @VisibleForTesting
+    void setCredentialsProvider(AwsCredentialsProvider credentialsProvider) {
+        this.credentialsProvider = credentialsProvider;
+    }
+
+    /**
+     * Initialize the credentials provider lazily.
+     *
+     * @return an instance of DefaultCredentialsProvider.
+     */
+    private AwsCredentialsProvider getCredentialsProvider() {
+        if (credentialsProvider == null) {
+            credentialsProvider = DefaultCredentialsProvider.create();
+        }
+        return credentialsProvider;
     }
 
     /**
@@ -100,7 +115,7 @@
         requestHeaders.put(X_AMZ_CONTENT_SHA_256, contentHashString);
 
         // Get the credentials from the default credential provider chain
-        AwsCredentials awsCreds = awsCredProvider.resolveCredentials();
+        AwsCredentials awsCreds = getCredentialsProvider().resolveCredentials();
 
         // If the credentials are from a session, also get the session token
         String sessionToken =
diff --git a/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerSerializationTest.java b/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerSerializationTest.java
new file mode 100644
index 0000000..ce15e01
--- /dev/null
+++ b/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerSerializationTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.aws;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.util.InstantiationUtil.isSerializable;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AmazonManagedPrometheusWriteRequestSignerSerializationTest {
+
+    @Test
+    void shouldBeActuallySerializable() {
+        AmazonManagedPrometheusWriteRequestSigner signer =
+                new AmazonManagedPrometheusWriteRequestSigner(
+                        "http://example.com/endpoint", "us-east-1");
+        assertTrue(isSerializable(signer), "The request signer should be serializable");
+    }
+}
diff --git a/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerTest.java b/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerTest.java
index 67ccccf..fd768e6 100644
--- a/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerTest.java
+++ b/flink-connector-prometheus-request-signer-amp/src/test/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSignerTest.java
@@ -88,9 +88,9 @@
     public void shouldAddExpectedHeaders_BasicCredentials() {
         AmazonManagedPrometheusWriteRequestSigner signer =
                 new AmazonManagedPrometheusWriteRequestSigner(
-                        "https://example.com/endpoint",
-                        "us-east-1",
-                        new DummyAwsBasicCredentialsProvider("access-key-id", "secret-access-key"));
+                        "https://example.com/endpoint", "us-east-1");
+        signer.setCredentialsProvider(
+                new DummyAwsBasicCredentialsProvider("access-key-id", "secret-access-key"));
 
         Map<String, String> requestHeaders = new HashMap<>();
         byte[] requestBody = "request-payload".getBytes(StandardCharsets.UTF_8);
@@ -109,10 +109,10 @@
     public void shouldAddExpectedHeaders_SessionCredentials() {
         AmazonManagedPrometheusWriteRequestSigner signer =
                 new AmazonManagedPrometheusWriteRequestSigner(
-                        "https://example.com/endpoint",
-                        "us-east-1",
-                        new DummAwsSessionCredentialProvider(
-                                "access-key-id", "secret-access-key", "session-key"));
+                        "https://example.com/endpoint", "us-east-1");
+        signer.setCredentialsProvider(
+                new DummAwsSessionCredentialProvider(
+                        "access-key-id", "secret-access-key", "session-key"));
 
         Map<String, String> requestHeaders = new HashMap<>();
         byte[] requestBody = "request-payload".getBytes(StandardCharsets.UTF_8);
diff --git a/flink-connector-prometheus/README.md b/flink-connector-prometheus/README.md
index 0343a1c..07f2308 100644
--- a/flink-connector-prometheus/README.md
+++ b/flink-connector-prometheus/README.md
@@ -13,11 +13,8 @@
 
 For efficiency, the connector does not do any validation.
 If input is out of order or malformed, **the write request is rejected by Prometheus**.
-If the write request is rejected, depending on the configured [error handling behaviour](#error-handling-behavior) for
-"Prometheus non-retriable errors", the sink will either throw an exception (`FAIL`, default behavior) or discard the
-offending
-request and continue (`DISCARD_AND_CONTINUE`). See [error handling behaviour](#error-handling-behavior), below, for
-further details.
+Currently, on such errors, the connector will discard the entire request containing the offending data, and continue.
+See [error handling behavior](#error-handling-behavior), below, for further details.
 
 The sink receives as input time-series, each containing one or more samples.
 To optimise the write throughput, input time-series are batched, in the order they are received, and written with a
@@ -25,9 +22,7 @@
 
 If a write-request contains any out-of-order or malformed data, **the entire request is rejected** and all time series
 are discarded.
-The reason is Remote-Write
-specifications [explicitly forbids retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff) of
-rejected write requests (4xx responses).
+The reason is Remote-Write specifications [explicitly forbids retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff) of rejected write requests (4xx responses).
 and the Prometheus response does not contain enough information to efficiently partially retry the write, discarding the
 offending data.
 
@@ -67,11 +62,8 @@
         .addLabel("InstanceID", instanceId)
         .addLabel("AccountID", accountId);
     
-for(
-Tuple2<Double, Long> sample :samples){
-        tsBuilder.
-
-addSample(sample.f0, sample.f1);
+for(Tuple2<Double, Long> sample :samples){
+        tsBuilder.addSample(sample.f0, sample.f1);
 }
 
 PrometheusTimeSeries ts = tsBuilder.build();
@@ -98,21 +90,20 @@
 The sink batches multiple time-series into a single write-request, retaining the order..
 
 Batching is based on the number of samples. Each write-request contains up to 500 samples, with a max buffering time of
-5 seconds
-(both configurable). The number of time-series doesn't matter.
+5 seconds (both configurable). The number of time-series doesn't matter.
 
-As by [Prometheus Remote-Write specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff),
-the sink retries 5xx and 429 responses. Retrying is blocking, to retain sample ordering, and uses and exponential
-backoff.
+As by [Prometheus Remote-Write specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff), the sink retries 5xx and 429 responses. Retrying is blocking, to 
+retain sample ordering, and uses and exponential backoff.
 
 The exponential backoff starts with an initial delay (default 30 ms) and increases it exponentially up to a max retry
 delay (default 5 sec). It continues retrying until the max number of retries is reached (default reties forever).
 
-On non-retriable error response (4xx, except 429, non retryable exceptions), or on reaching the retry limit, depending
-on
-the configured [error handling behaviour](#error-handling-behavior) for "Max retries exceeded", the sink will either
-throw
-an exception (`FAIL`, default behaviour), or **discard the entire write-request**, log a warning and continue. See
+On non-retriable error response (4xx, except 429, non retryable exceptions) the sink will always discard and continue 
+(`DISCARD_AND_CONTINUE` behavior; see details below).
+
+On reaching the retry limit, depending
+on the configured [error handling behaviour](#error-handling-behavior) for "Max retries exceeded", the sink will either
+throw an exception (`FAIL`, default behaviour), or **discard the entire write-request**, log a warning and continue. See
 [error handling behaviour](#error-handling-behavior), below, for further details.
 
 ### Initializing the sink
@@ -134,8 +125,10 @@
         .setPrometheusRemoteWriteUrl(prometheusRemoteWriteUrl)  // Remote-write URL
         .setRequestSigner(new AmazonManagedPrometheusWriteRequestSigner(prometheusRemoteWriteUrl, prometheusRegion)) // Optional request signed (AMP request signer in this example)
         .setErrorHandlingBehaviourConfiguration(SinkWriterErrorHandlingBehaviorConfiguration.builder()
-                // Error handling behaviors. See description below, for more details. All behaviors default to FAIL   
-                .onPrometheusNonRetriableError(OnErrorBehavior.FAIL)
+                // Error handling behaviors. See description below, for more details.
+                // Default is DISCARD_AND_CONTINUE for non-retriable errors
+                .onPrometheusNonRetriableError(OnErrorBehavior.DISCARD_AND_CONTINUE) 
+                // Default is FAIL for other error types
                 .onMaxRetryExceeded(OnErrorBehavior.FAIL)
                 .onHttpClientIOFail(OnErrorBehavior.FAIL)
                 .build())
@@ -183,17 +176,17 @@
 The behaviour of the sink, when an unrecoverable error happens while writing to Prometheus remote-write endpoint, is
 configurable.
 
+
+The possible behaviors are:
+
+* `FAIL`: throw a `PrometheusSinkWriteException`, causing the job to fail.
+* `DISCARD_AND_CONTINUE`: log the reason of the error, discard the offending request, and continue.
+
 There are 3 error conditions:
 
-1. Prometheus returns a non-retriable error response (i.e. any `4xx` status code except `429`)
-2. Prometheus returns a retriable error response (i.e. `5xx` or `429`) but the max retry limit is exceeded
-3. The http client fails to complete the request, for an I/O error
-
-The default behavior of the sink, for all these three error conditions, is to `FAIL`: throw
-a `PrometheusSinkWriteException`, causing the job to fail.
-
-Optionally, for each of these error conditions independently, the sink can be configured to `DISCARD_AND_CONTINUE`: "
-log, discard the offending request and continue".
+1. Prometheus returns a non-retriable error response (i.e. any `4xx` status code except `429`). Default: `DISCARD_AND_CONTINUE`.
+2. Prometheus returns a retriable error response (i.e. `5xx` or `429`) but the max retry limit is exceeded. Default: `FAIL`.
+3. The http client fails to complete the request, for an I/O error. Default: `FAIL`.
 
 The error handling behaviors can be configured when creating the instance of the sink, as shown in this snipped:
 
@@ -219,6 +212,12 @@
 request are causing the problem.
 Prometheus does not return sufficient information to automatically handle partial requests.
 
+> In the current connector version, `DISCARD_AND_CONTINUE` is the only supported behavior for non-retriable error.
+> 
+> The behavior cannot be set to `FAIL`. Failing on non-retriable error would make impossible for the application to
+> restart from checkpoint. The reason is that restarting from checkpoint cause some duplicates, that are rejected by
+> Prometheus as out of order, causing in turn another non-retriable error, in an endless loop.
+
 ### Metrics
 
 The sink exposes custom metrics, counting the samples and write-requests (batches) successfully written or discarded.
diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
index 52a10ba..7a7738f 100644
--- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
+++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
@@ -28,8 +28,11 @@
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
 
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.Collection;
 
 /** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
@@ -73,8 +76,22 @@
                 maxRecordSizeInSamples // maxRecordSizeInBytes
                 );
 
-        Preconditions.checkArgument(maxInFlightRequests == 1, "maxInFlightRequests must be 1");
-
+        Preconditions.checkArgument(
+                maxBatchSizeInSamples > 1, "Max batch size (in samples) must be positive");
+        Preconditions.checkArgument(
+                maxRecordSizeInSamples <= maxBatchSizeInSamples,
+                "Maz record size (in samples) must be <= Max batch size");
+        Preconditions.checkArgument(maxInFlightRequests == 1, "Max in-flight requests must be 1");
+        Preconditions.checkArgument(
+                StringUtils.isNotBlank(prometheusRemoteWriteUrl),
+                "Missing or blank Prometheus Remote-Write URL");
+        checkValidRemoteWriteUrl(prometheusRemoteWriteUrl);
+        Preconditions.checkArgument(
+                StringUtils.isNotBlank(httpUserAgent), "Missing HTTP User Agent string");
+        Preconditions.checkNotNull(
+                errorHandlingBehaviorConfig, "Missing error handling configuration");
+        Preconditions.checkArgument(
+                StringUtils.isNotBlank(metricGroupName), "Missing metric group name");
         this.maxBatchSizeInSamples = maxBatchSizeInSamples;
         this.requestSigner = requestSigner;
         this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
@@ -147,4 +164,12 @@
             getWriterStateSerializer() {
         return new PrometheusStateSerializer();
     }
+
+    private static void checkValidRemoteWriteUrl(String url) {
+        try {
+            new URL(url);
+        } catch (MalformedURLException mue) {
+            throw new IllegalArgumentException("Invalid Remote-Write URL: " + url, mue);
+        }
+    }
 }
diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
index 69eb72c..02b1e77 100644
--- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
+++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
@@ -22,14 +22,10 @@
 import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
 import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
 import org.apache.flink.connector.prometheus.sink.prometheus.Types;
-import org.apache.flink.util.Preconditions;
 
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.util.Optional;
 
 /** Builder for Sink implementation. */
@@ -74,7 +70,7 @@
                 Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS);
 
         int actualMaxRecordSizeInSamples =
-                Optional.ofNullable(maxRecordSizeInSamples).orElse(maxBatchSizeInSamples);
+                Optional.ofNullable(maxRecordSizeInSamples).orElse(actualMaxBatchSizeInSamples);
 
         int actualSocketTimeoutMs =
                 Optional.ofNullable(socketTimeoutMs)
@@ -92,25 +88,19 @@
                                                 .SinkWriterErrorHandlingBehaviorConfiguration
                                                 .DEFAULT_BEHAVIORS);
 
+        PrometheusSinkConfiguration.RetryConfiguration actualRetryConfiguration =
+                Optional.ofNullable(retryConfiguration)
+                        .orElse(
+                                PrometheusSinkConfiguration.RetryConfiguration
+                                        .DEFAULT_RETRY_CONFIGURATION);
+
         String actualMetricGroupName =
                 Optional.ofNullable(metricGroupName).orElse(DEFAULT_METRIC_GROUP_NAME);
-
-        Preconditions.checkArgument(
-                StringUtils.isNotBlank(prometheusRemoteWriteUrl),
-                "Missing or blank Prometheus Remote-Write URL");
-        checkValidRemoteWriteUrl(prometheusRemoteWriteUrl);
-        Preconditions.checkNotNull(retryConfiguration, "Missing retry configuration");
-        Preconditions.checkArgument(
-                actualMaxBatchSizeInSamples > 0, "Max batch size (in samples) must be positive");
-        Preconditions.checkArgument(
-                actualMaxRecordSizeInSamples <= actualMaxBatchSizeInSamples,
-                "Max record size (in samples) must be <= Max batch size");
-
         LOG.info(
                 "Prometheus sink configuration:"
                         + "\n\t\tmaxBatchSizeInSamples={}\n\t\tmaxRecordSizeInSamples={}"
                         + "\n\t\tmaxTimeInBufferMs={}\n\t\tmaxInFlightRequests={}\n\t\tmaxBufferedRequests={}"
-                        + "\n\t\tinitialRetryDelayMs={}\n\t\tmaxRetryDelayMs={}\n\t\tmaxRetryCount={}"
+                        + "\n\t\tRetryConfiguration: initialRetryDelayMs={}, maxRetryDelayMs={}, maxRetryCount={}"
                         + "\n\t\tsocketTimeoutMs={}\n\t\thttpUserAgent={}"
                         + "\n\t\tErrorHandlingBehaviour: onMaxRetryExceeded={}, onHttpClientIOFailure={}, onNonRetriableError={}",
                 actualMaxBatchSizeInSamples,
@@ -118,9 +108,9 @@
                 actualMaxTimeInBufferMS,
                 MAX_IN_FLIGHT_REQUESTS,
                 actualMaxBufferedRequests,
-                retryConfiguration.getInitialRetryDelayMS(),
-                retryConfiguration.getMaxRetryDelayMS(),
-                retryConfiguration.getMaxRetryCount(),
+                actualRetryConfiguration.getInitialRetryDelayMS(),
+                actualRetryConfiguration.getMaxRetryDelayMS(),
+                actualRetryConfiguration.getMaxRetryCount(),
                 socketTimeoutMs,
                 actualHttpUserAgent,
                 actualErrorHandlingBehaviorConfig.getOnMaxRetryExceeded(),
@@ -135,7 +125,7 @@
                 actualMaxRecordSizeInSamples,
                 actualMaxTimeInBufferMS,
                 prometheusRemoteWriteUrl,
-                new PrometheusAsyncHttpClientBuilder(retryConfiguration)
+                new PrometheusAsyncHttpClientBuilder(actualRetryConfiguration)
                         .setSocketTimeout(actualSocketTimeoutMs),
                 requestSigner,
                 actualHttpUserAgent,
@@ -143,14 +133,6 @@
                 actualMetricGroupName);
     }
 
-    private static void checkValidRemoteWriteUrl(String url) {
-        try {
-            new URL(url);
-        } catch (MalformedURLException mue) {
-            throw new IllegalArgumentException("Invalid Remote-Write URL: " + url, mue);
-        }
-    }
-
     public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String prometheusRemoteWriteUrl) {
         this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
         return this;
diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
index b939d09..2c7dd34 100644
--- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
+++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
@@ -18,10 +18,12 @@
 package org.apache.flink.connector.prometheus.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.Optional;
 
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.OnErrorBehavior.DISCARD_AND_CONTINUE;
 import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.OnErrorBehavior.FAIL;
 
 /** This class contains configuration classes for different components of the Prometheus sink. */
@@ -46,7 +48,7 @@
         public static final OnErrorBehavior ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR = FAIL;
         public static final OnErrorBehavior ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR = FAIL;
         public static final OnErrorBehavior ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR =
-                FAIL;
+                DISCARD_AND_CONTINUE;
 
         /** Behaviour when the max retries is exceeded on Prometheus retriable errors. */
         private final OnErrorBehavior onMaxRetryExceeded;
@@ -61,6 +63,13 @@
                 OnErrorBehavior onMaxRetryExceeded,
                 OnErrorBehavior onHttpClientIOFail,
                 OnErrorBehavior onPrometheusNonRetriableError) {
+            // onPrometheusNonRetriableError cannot be set to FAIL, because it makes impossible for
+            // the job to restart from checkpoint (see FLINK-36319).
+            // We are retaining the possibility of configuring the behavior on this type of error to
+            // allow implementing a different type of behavior.
+            Preconditions.checkArgument(
+                    onPrometheusNonRetriableError == DISCARD_AND_CONTINUE,
+                    "Only DISCARD_AND_CONTINUE is currently supported for onPrometheusNonRetriableError");
             this.onMaxRetryExceeded = onMaxRetryExceeded;
             this.onHttpClientIOFail = onHttpClientIOFail;
             this.onPrometheusNonRetriableError = onPrometheusNonRetriableError;
@@ -178,5 +187,8 @@
                 return new RetryConfiguration(initialRetryDelayMS, maxRetryDelayMS, maxRetryCount);
             }
         }
+
+        public static final RetryConfiguration DEFAULT_RETRY_CONFIGURATION =
+                new RetryConfiguration.Builder().build();
     }
 }
diff --git a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
index 1a4ec3e..29c504e 100644
--- a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
+++ b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
@@ -59,6 +59,7 @@
         int actualSocketTimeoutMs =
                 Optional.ofNullable(socketTimeoutMs).orElse(DEFAULT_SOCKET_TIMEOUT_MS);
 
+        Preconditions.checkNotNull(retryConfiguration, "Missing retry configuration");
         Preconditions.checkArgument(
                 retryConfiguration.getInitialRetryDelayMS() >= 0,
                 "Initial retry delay must be >= 0");
diff --git a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
index 758f488..4b98d63 100644
--- a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
+++ b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
@@ -30,6 +30,9 @@
 import java.util.List;
 import java.util.function.Consumer;
 
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR;
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR;
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -57,12 +60,10 @@
                 errorHandlingBehavior =
                         PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
                                 .builder()
-                                .onMaxRetryExceeded(
-                                        PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
-                                .onHttpClientIOFail(
-                                        PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
+                                .onMaxRetryExceeded(ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR)
+                                .onHttpClientIOFail(ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR)
                                 .onPrometheusNonRetriableError(
-                                        PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
+                                        ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR)
                                 .build();
 
         HttpResponseCallback callback =
@@ -85,33 +86,6 @@
     }
 
     @Test
-    void shouldThrowExceptionOnCompletedWith404WhenFailOnNonRetriableIsSelected() {
-        PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                errorHandlingBehavior =
-                        PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                                .builder()
-                                .onPrometheusNonRetriableError(
-                                        PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
-                                .build();
-
-        HttpResponseCallback callback =
-                new HttpResponseCallback(
-                        TIME_SERIES_COUNT,
-                        SAMPLE_COUNT,
-                        metricsCallback,
-                        errorHandlingBehavior,
-                        requestResults);
-
-        SimpleHttpResponse httpResponse = new SimpleHttpResponse(HttpStatus.SC_NOT_FOUND);
-
-        assertThrows(
-                PrometheusSinkWriteException.class,
-                () -> {
-                    callback.completed(httpResponse);
-                });
-    }
-
-    @Test
     void shouldIncFailCountersOnCompletedWith404WhenDiscardAndContinueOnNonRetriableIsSelected() {
         PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
                 errorHandlingBehavior =
diff --git a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
index f3bddce..9fc4cfa 100644
--- a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
+++ b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
@@ -45,6 +45,9 @@
 import static org.apache.flink.connector.prometheus.sink.HttpResponseCallbackTestUtils.assertCallbackCompletedOnceWithException;
 import static org.apache.flink.connector.prometheus.sink.HttpResponseCallbackTestUtils.assertCallbackCompletedOnceWithNoException;
 import static org.apache.flink.connector.prometheus.sink.HttpResponseCallbackTestUtils.getRequestResult;
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR;
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR;
+import static org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR;
 import static org.awaitility.Awaitility.await;
 
 /**
@@ -111,17 +114,15 @@
         int statusCode = 200;
         serverWillRespond(status(statusCode));
 
-        // Fail on any error
+        // Default behaviors for all errors
         PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
                 errorHandlingBehavior =
                         PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
                                 .builder()
-                                .onMaxRetryExceeded(
-                                        PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
-                                .onHttpClientIOFail(
-                                        PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
+                                .onMaxRetryExceeded(ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR)
+                                .onHttpClientIOFail(ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR)
                                 .onPrometheusNonRetriableError(
-                                        PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
+                                        ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR)
                                 .build();
 
         VerifyableResponseCallback callback =
@@ -266,44 +267,6 @@
     }
 
     @Test
-    void shouldNotRetryAndCompleteAndThrowExceptionOn404WhenFailOnNonRetriableIsSelected(
-            WireMockRuntimeInfo wmRuntimeInfo) throws URISyntaxException, IOException {
-        PrometheusAsyncHttpClientBuilder clientBuilder = getHttpClientBuilder(1);
-
-        // 404,Not found is non-retriable for Prometheus remote-write
-        int statusCode = 404;
-        serverWillRespond(status(statusCode));
-
-        // Fail on non-retriable
-        PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                errorHandlingBehavior =
-                        PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                                .builder()
-                                .onPrometheusNonRetriableError(
-                                        PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
-                                .build();
-
-        VerifyableResponseCallback callback =
-                getResponseCallback(metricsCallback, errorHandlingBehavior);
-
-        SimpleHttpRequest request = buildRequest(wmRuntimeInfo);
-
-        try (CloseableHttpAsyncClient client = clientBuilder.buildAndStartClient(metricsCallback)) {
-            client.execute(request, callback);
-
-            await().untilAsserted(
-                            () -> {
-                                // Check the client execute only one request
-                                verify(exactly(1), postRequestedFor(urlEqualTo("/remote_write")));
-
-                                // Verify the callback was completed once with an exception
-                                assertCallbackCompletedOnceWithException(
-                                        PrometheusSinkWriteException.class, callback);
-                            });
-        }
-    }
-
-    @Test
     void shouldNotRetryCompleteAndThrowExceptionOn304(WireMockRuntimeInfo wmRuntimeInfo)
             throws URISyntaxException, IOException {
 
diff --git a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java
new file mode 100644
index 0000000..f0639a0
--- /dev/null
+++ b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class PrometheusSinkBuilderTest {
+    private static final String ENDPOINT = "http://example.com:123/something";
+
+    @Test
+    void shouldBuildSinkOnlyProvidingPrometheusRemoteWriteUrl() {
+        PrometheusSink sink =
+                (PrometheusSink)
+                        new PrometheusSinkBuilder().setPrometheusRemoteWriteUrl(ENDPOINT).build();
+        assertNotNull(sink);
+    }
+
+    @Test
+    void shouldBuildSinkProvidingAllFields() {
+        PrometheusSink sink =
+                (PrometheusSink)
+                        new PrometheusSinkBuilder()
+                                .setPrometheusRemoteWriteUrl(ENDPOINT)
+                                .setMaxBatchSizeInSamples(500)
+                                .setMaxRecordSizeInSamples(500)
+                                .setMaxTimeInBufferMS(5000)
+                                .setRetryConfiguration(
+                                        PrometheusSinkConfiguration.RetryConfiguration
+                                                .DEFAULT_RETRY_CONFIGURATION)
+                                .setSocketTimeoutMs(1000)
+                                .setRequestSigner(new DummyPrometheusRequestSigner())
+                                .setHttpUserAgent("test")
+                                .setErrorHandlingBehaviourConfiguration(
+                                        PrometheusSinkConfiguration
+                                                .SinkWriterErrorHandlingBehaviorConfiguration
+                                                .builder()
+                                                .onHttpClientIOFail(
+                                                        PrometheusSinkConfiguration.OnErrorBehavior
+                                                                .FAIL)
+                                                .onMaxRetryExceeded(
+                                                        PrometheusSinkConfiguration.OnErrorBehavior
+                                                                .FAIL)
+                                                .onPrometheusNonRetriableError(
+                                                        PrometheusSinkConfiguration.OnErrorBehavior
+                                                                .DISCARD_AND_CONTINUE)
+                                                .build())
+                                .setMetricGroupName("test")
+                                .build();
+        assertNotNull(sink);
+    }
+
+    @Test
+    void shouldFailIfPrometheusRemoteWriteUrlIsMissing() {
+        assertThrows(IllegalArgumentException.class, () -> new PrometheusSinkBuilder().build());
+    }
+
+    @Test
+    void shouldFailIfPrometheusRemoteWriteUrlIsInvalid() {
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> new PrometheusSinkBuilder().setPrometheusRemoteWriteUrl("invalid").build());
+    }
+}
diff --git a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkSerializationTest.java b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkSerializationTest.java
new file mode 100644
index 0000000..a7bb6ef
--- /dev/null
+++ b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkSerializationTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.util.InstantiationUtil.isSerializable;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PrometheusSinkSerializationTest {
+
+    @Test
+    void shouldBeActuallySerializable() {
+        PrometheusSink sink =
+                (PrometheusSink)
+                        PrometheusSink.builder()
+                                .setPrometheusRemoteWriteUrl("http://example.com/endpoint")
+                                .build();
+
+        assertTrue(isSerializable(sink), "The sink object should be serializable");
+    }
+}
diff --git a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
index be88e5c..ef644f7 100644
--- a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
+++ b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
@@ -20,6 +20,7 @@
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 class SinkWriterErrorHandlingBehaviorConfigurationTest {
 
@@ -44,9 +45,21 @@
     }
 
     @Test
-    public void shouldDefaultToFailOnPrometheusNonRetriableError() {
+    public void shouldDefaultToDiscardAndContinueOnPrometheusNonRetriableError() {
         assertEquals(
-                PrometheusSinkConfiguration.OnErrorBehavior.FAIL,
+                PrometheusSinkConfiguration.OnErrorBehavior.DISCARD_AND_CONTINUE,
                 DEFAULT_CONFIG.getOnPrometheusNonRetriableError());
     }
+
+    @Test
+    public void shouldPreventSettingContinueOnPrometheusNonRetriableErrorToFail() {
+        assertThrows(
+                IllegalArgumentException.class,
+                () ->
+                        PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
+                                .builder()
+                                .onPrometheusNonRetriableError(
+                                        PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
+                                .build());
+    }
 }