Implementation of the Prometheus sink connector for DataStream API.
The sink writes to Prometheus using the Remote-Write interface, based on Remote-Write specifications version 1.0
Due to the strict ordering and format requirements of Prometheus Remote-Write, the sink requires that users ensure that input data is in order and well-formed, before sending it to the sink.
For efficiency, the connector does not do any validation. If input is out of order or malformed, the write request is rejected by Prometheus. If the write request is rejected, depending on the configured error handling behaviour for “Prometheus non-retriable errors”, the sink will either throw an exception (FAIL
, default behavior) or discard the offending request and continue (DISCARD_AND_CONTINUE
). See error handling behaviour, below, for further details.
The sink receives as input time-series, each containing one or more samples. To optimise the write throughput, input time-series are batched, in the order they are received, and written with a single write-request.
If a write-request contains any out-of-order or malformed data, the entire request is rejected and all time series are discarded. The reason is Remote-Write specifications explicitly forbids retrying of rejected write requests (4xx responses). and the Prometheus response does not contain enough information to efficiently partially retry the write, discarding the offending data.
It is responsibility of the application sending the data to the sink in the correct order and format.
KeySelector
is provided to partition input correctly ( see Partitioning, below).To help sending well-formed data to the sink, the connector expect PrometheusTimeSeries
POJOs as input.
Each PrometheusTimeSeries
instance maps 1-to-1 to a remote-write TimeSeries
. Each object contains:
metericName
, mapped to the special __name__
labelSamples
{ value: double, timestamp: long } - MUST BE IN TIMESTAMP ORDERPrometheusTimeSeries
provides a builder interface.
// List<Tuple2<Double, Long>> samples = ... PrometheusTimeSeries.Builder tsBuilder = PrometheusTimeSeries.builder() .withMetricName("CPU") // mapped to `__name__` label .addLabel("InstanceID", instanceId) .addLabel("AccountID", accountId); for( Tuple2<Double, Long> sample :samples){ tsBuilder. addSample(sample.f0, sample.f1); } PrometheusTimeSeries ts = tsBuilder.build();
Prometheus imposes strict constraints to the content sent to remote-write, including label format and ordering, sample time ordering etc.
For efficiency, the sink does not do any validation or reordering of the input. It's responsibility of the application ensuring that input is well-formed.
Any malformed data will be rejected on write to Prometheus. Depending on the error handling behaviours configured, the sink will throw an exception stopping the job (default), or drop the entire write-request, log the fact, and continue.
For complete details about these constraints, refer to the remote-write specifications.
The sink batches multiple time-series into a single write-request, retaining the order..
Batching is based on the number of samples. Each write-request contains up to 500 samples, with a max buffering time of 5 seconds (both configurable). The number of time-series doesn't matter.
As by Prometheus Remote-Write specifications, the sink retries 5xx and 429 responses. Retrying is blocking, to retain sample ordering, and uses and exponential backoff.
The exponential backoff starts with an initial delay (default 30 ms) and increases it exponentially up to a max retry delay (default 5 sec). It continues retrying until the max number of retries is reached (default reties forever).
On non-retriable error response (4xx, except 429, non retryable exceptions), or on reaching the retry limit, depending on the configured error handling behaviour for “Max retries exceeded”, the sink will either throw an exception (FAIL
, default behaviour), or discard the entire write-request, log a warning and continue. See error handling behaviour, below, for further details.
Example of sink initialisation (for documentation purposes, we are setting all parameters to their default values): Sink
PrometheusSink sink = PrometheusSink.builder() .setMaxBatchSizeInSamples(500) // Batch size (write-request size), in samples (default: 500) .setMaxRecordSizeInSamples(500) // Max sink input record size, in samples (default: 500), must be <= maxBatchSizeInSamples - If exceeded the job will continuously fail and restart! .setMaxTimeInBufferMS(5000) // Max time a time-series is buffered for batching (default: 5000 ms) .setRetryConfiguration(RetryConfiguration.builder() .setInitialRetryDelayMS(30L) // Initial retry delay (default: 30 ms) .setMaxRetryDelayMS(5000L) // Maximum retry delay, with exponential backoff (default: 5000 ms) .setMaxRetryCount(100) // Max number of retries (default: 100) .build()) .setSocketTimeoutMs(5000) // Http client socket timeout (default: 5000 ms) .setPrometheusRemoteWriteUrl(prometheusRemoteWriteUrl) // Remote-write URL .setRequestSigner(new AmazonManagedPrometheusWriteRequestSigner(prometheusRemoteWriteUrl, prometheusRegion)) // Optional request signed (AMP request signer in this example) .setErrorHandlingBehaviourConfiguration(SinkWriterErrorHandlingBehaviorConfiguration.builder() // Error handling behaviors. See description below, for more details. All behaviors default to FAIL .onPrometheusNonRetriableError(OnErrorBehavior.FAIL) .onMaxRetryExceeded(OnErrorBehavior.FAIL) .onHttpClientIOFail(OnErrorBehavior.FAIL) .build()) .setMetricGroupName("Prometheus") // Customizable metric-group suffix (default: "Prometheus") .build();
When the sink has parallelism > 1, the stream must be partitioned so that all time-series with same labels go to the same sink operator sub-task. If this is not the case, samples may be written out-of-order, and be rejected by Prometheus.
A keyBy()
using the provided key selector, PrometheusTimeSeriesLabelsAndMetricNameKeySelector
, automatically partitions the time-series by labels.
The sink supports optional request-signing for authentication, implementing the PrometheusRequestSigner
interface.
The sink complies with Prometheus remote-write specs not retrying any request that return status codes 4xx
, except 429
, and retrying requests that return 5xx
or 429
, with an exponential backoff strategy.
The retry strategy can be configured, as shown in the following snippet:
PrometheusSink sink = PrometheusSink.builder() .setRetryConfiguration(RetryConfiguration.builder() .setInitialRetryDelayMS(30L) // Initial retry delay (default: 30 ms) .setMaxRetryDelayMS(5000L) // Maximum retray delay, with exponential backoff (default: 5000 ms) .setMaxRetryCount(100) // Max number of retries (default: 100) .build()) // ... .build();
The behaviour of the sink, when an unrecoverable error happens while writing to Prometheus remote-write endpoint, is configurable.
There are 3 error conditions:
4xx
status code except 429
)5xx
or 429
) but the max retry limit is exceededThe default behavior of the sink, for all these three error conditions, is to FAIL
: throw a PrometheusSinkWriteException
, causing the job to fail.
Optionally, for each of these error conditions independently, the sink can be configured to DISCARD_AND_CONTINUE
: " log, discard the offending request and continue".
The error handling behaviors can be configured when creating the instance of the sink, as shown in this snipped:
PrometheusSink sink = PrometheusSink.builder() // ... .setErrorHandlingBehaviourConfiguration(SinkWriterErrorHandlingBehaviorConfiguration.builder() .onPrometheusNonRetriableError(OnErrorBehavior.DISCARD_AND_CONTINUE) .onMaxRetryExceeded(OnErrorBehavior.DISCARD_AND_CONTINUE) .onHttpClientIOFail(OnErrorBehavior.DISCARD_AND_CONTINUE) .build()) .build();
When configured for DISCARD_AND_CONTINUE
, the sink will do the following:
WARN
level, with information about the problem and the number of time-series and samples droppedNote that there is no partial-failure condition: the entire write-request is discarded regardless what data in the request are causing the problem. Prometheus does not return sufficient information to automatically handle partial requests.
The sink exposes custom metrics, counting the samples and write-requests (batches) successfully written or discarded.
numSamplesOut
number of samples successfully written to PrometheusnumWriteRequestsOut
number of write-requests successfully written to PrometheusnumWriteRequestsRetries
number of write requests retried due to a retriable error (e.g. throttling)numSamplesDropped
number of samples dropped, for any reasonsnumSamplesNonRetriableDropped
(when onPrometheusNonRetriableError
is set to DISCARD_AND_CONTINUE
) number of samples dropped due to non-retriable errorsnumSamplesRetryLimitDropped
(when onMaxRetryExceeded
is set to DISCARD_AND_CONTINUE
) number of samples dropped due to reaching the max number of retriesnumWriteRequestsPermanentlyFailed
number of write requests permanently failed, due to any reasons (non retryable, max nr of retries)Note: the numBytesOut
does not measure the number of bytes, due to an internal limitation of the base sink. This metric should be ignored, and you should rely on numSamplesOut
and numWriteRequestsOut
instead.
These custom metrics are exposed on partially customizable scope. By default, the scope is Sink__Writer.Prometheus
. It can be customized to any Sink__Writer.<metric-group>
.
The connector includes classes for Protobuf objects, Remote, Types, and GoGoProtos.
The *.proto files are provided for reference only. Protobuf binaries are required to re-generate the Java classe. Please refer to Protobuf documentation.
You can find a complete application example using the connector in DataStreamExample.java.