blob: 07f2308e5967c2b3bdc28c722c3595c82261079f [file] [log] [blame] [view]
## Flink Prometheus connector (sink)
Implementation of the Prometheus sink connector for DataStream API.
The sink writes to Prometheus using the Remote-Write interface, based
on [Remote-Write specifications version 1.0](https://prometheus.io/docs/concepts/remote_write_spec/)
### Guarantees and input restrictions
Due to the strict [ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering)
and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) requirements of Prometheus Remote-Write, the sink requires that users ensure that input data is in
order and well-formed, before sending it to the sink.
For efficiency, the connector does not do any validation.
If input is out of order or malformed, **the write request is rejected by Prometheus**.
Currently, on such errors, the connector will discard the entire request containing the offending data, and continue.
See [error handling behavior](#error-handling-behavior), below, for further details.
The sink receives as input time-series, each containing one or more samples.
To optimise the write throughput, input time-series are batched, in the order they are received, and written with a
single write-request.
If a write-request contains any out-of-order or malformed data, **the entire request is rejected** and all time series
are discarded.
The reason is Remote-Write specifications [explicitly forbids retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff) of rejected write requests (4xx responses).
and the Prometheus response does not contain enough information to efficiently partially retry the write, discarding the
offending data.
### Responsibilities of the application
It is responsibility of the application sending the data to the sink in the correct order and format.
1. Input time-series must be well-formed, e.g. only valid and non-duplicated labels,
samples in timestamp order (see [Labels and Ordering](https://prometheus.io/docs/concepts/remote_write_spec/#labels)
in Prometheus Remote-Write specs).
2. Input time-series with identical labels are sent to the sink in timestamp order.
3. If sink parallelism > 1 is used, the input stream must be partitioned so that all time-series with identical labels
go to the same sink subtask. A `KeySelector` is provided to partition input correctly (
see [Partitioning](#partitioning), below).
#### Sink input objects
To help sending well-formed data to the sink, the connector
expect [`PrometheusTimeSeries`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java)
POJOs as input.
Each `PrometheusTimeSeries` instance maps 1-to-1 to
a [remote-write `TimeSeries`](https://prometheus.io/docs/concepts/remote_write_spec/#protocol). Each object contains:
* exactly one `metericName`, mapped to the special `__name__` label
* optionally, any number of additional labels { k: String, v:String } - MUST BE IN ORDER BY KEY
* one or more `Samples` { value: double, timestamp: long } - MUST BE IN TIMESTAMP ORDER
`PrometheusTimeSeries` provides a builder interface.
```java
// List<Tuple2<Double, Long>> samples = ...
PrometheusTimeSeries.Builder tsBuilder = PrometheusTimeSeries.builder()
.withMetricName("CPU") // mapped to `__name__` label
.addLabel("InstanceID", instanceId)
.addLabel("AccountID", accountId);
for(Tuple2<Double, Long> sample :samples){
tsBuilder.addSample(sample.f0, sample.f1);
}
PrometheusTimeSeries ts = tsBuilder.build();
```
#### Input data validation
Prometheus imposes strict constraints to the content sent to remote-write, including label format and ordering, sample
time ordering etc.
For efficiency, the sink **does not do any validation or reordering** of the input. It's responsibility
of the application ensuring that input is well-formed.
Any malformed data will be rejected on write to Prometheus. Depending on
the [error handling behaviours](#error-handling-behavior)
configured, the sink will throw an exception stopping the job (default), or drop the entire write-request, log the fact,
and continue.
For complete details about these constraints, refer to
the [remote-write specifications](https://prometheus.io/docs/concepts/remote_write_spec/).
### Batching, blocking writes and retry
The sink batches multiple time-series into a single write-request, retaining the order..
Batching is based on the number of samples. Each write-request contains up to 500 samples, with a max buffering time of
5 seconds (both configurable). The number of time-series doesn't matter.
As by [Prometheus Remote-Write specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff), the sink retries 5xx and 429 responses. Retrying is blocking, to
retain sample ordering, and uses and exponential backoff.
The exponential backoff starts with an initial delay (default 30 ms) and increases it exponentially up to a max retry
delay (default 5 sec). It continues retrying until the max number of retries is reached (default reties forever).
On non-retriable error response (4xx, except 429, non retryable exceptions) the sink will always discard and continue
(`DISCARD_AND_CONTINUE` behavior; see details below).
On reaching the retry limit, depending
on the configured [error handling behaviour](#error-handling-behavior) for "Max retries exceeded", the sink will either
throw an exception (`FAIL`, default behaviour), or **discard the entire write-request**, log a warning and continue. See
[error handling behaviour](#error-handling-behavior), below, for further details.
### Initializing the sink
Example of sink initialisation (for documentation purposes, we are setting all parameters to their default values):
Sink
```java
PrometheusSink sink = PrometheusSink.builder()
.setMaxBatchSizeInSamples(500) // Batch size (write-request size), in samples (default: 500)
.setMaxRecordSizeInSamples(500) // Max sink input record size, in samples (default: 500), must be <= maxBatchSizeInSamples - If exceeded the job will continuously fail and restart!
.setMaxTimeInBufferMS(5000) // Max time a time-series is buffered for batching (default: 5000 ms)
.setRetryConfiguration(RetryConfiguration.builder()
.setInitialRetryDelayMS(30L) // Initial retry delay (default: 30 ms)
.setMaxRetryDelayMS(5000L) // Maximum retry delay, with exponential backoff (default: 5000 ms)
.setMaxRetryCount(100) // Max number of retries (default: 100)
.build())
.setSocketTimeoutMs(5000) // Http client socket timeout (default: 5000 ms)
.setPrometheusRemoteWriteUrl(prometheusRemoteWriteUrl) // Remote-write URL
.setRequestSigner(new AmazonManagedPrometheusWriteRequestSigner(prometheusRemoteWriteUrl, prometheusRegion)) // Optional request signed (AMP request signer in this example)
.setErrorHandlingBehaviourConfiguration(SinkWriterErrorHandlingBehaviorConfiguration.builder()
// Error handling behaviors. See description below, for more details.
// Default is DISCARD_AND_CONTINUE for non-retriable errors
.onPrometheusNonRetriableError(OnErrorBehavior.DISCARD_AND_CONTINUE)
// Default is FAIL for other error types
.onMaxRetryExceeded(OnErrorBehavior.FAIL)
.onHttpClientIOFail(OnErrorBehavior.FAIL)
.build())
.setMetricGroupName("Prometheus") // Customizable metric-group suffix (default: "Prometheus")
.build();
```
### Partitioning
When the sink has parallelism > 1, the stream must be partitioned so that all time-series with same labels go to the
same
sink operator sub-task. If this is not the case, samples may be written out-of-order, and be rejected by Prometheus.
A `keyBy()` using the provided key selector,
[`PrometheusTimeSeriesLabelsAndMetricNameKeySelector`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeriesLabelsAndMetricNameKeySelector.java),
automatically partitions the time-series by labels.
### Authentication and Request Signing (optional)
The sink supports optional request-signing for authentication, implementing the
[`PrometheusRequestSigner`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusRequestSigner.java)
interface.
### Retriable errors
The sink complies with Prometheus remote-write specs not retrying any request that return status codes `4xx`,
except `429`,
and retrying requests that return `5xx` or `429`, with an exponential backoff strategy.
The retry strategy can be configured, as shown in the following snippet:
```java
PrometheusSink sink = PrometheusSink.builder()
.setRetryConfiguration(RetryConfiguration.builder()
.setInitialRetryDelayMS(30L) // Initial retry delay (default: 30 ms)
.setMaxRetryDelayMS(5000L) // Maximum retray delay, with exponential backoff (default: 5000 ms)
.setMaxRetryCount(100) // Max number of retries (default: 100)
.build())
// ...
.build();
```
### Error handling behavior
The behaviour of the sink, when an unrecoverable error happens while writing to Prometheus remote-write endpoint, is
configurable.
The possible behaviors are:
* `FAIL`: throw a `PrometheusSinkWriteException`, causing the job to fail.
* `DISCARD_AND_CONTINUE`: log the reason of the error, discard the offending request, and continue.
There are 3 error conditions:
1. Prometheus returns a non-retriable error response (i.e. any `4xx` status code except `429`). Default: `DISCARD_AND_CONTINUE`.
2. Prometheus returns a retriable error response (i.e. `5xx` or `429`) but the max retry limit is exceeded. Default: `FAIL`.
3. The http client fails to complete the request, for an I/O error. Default: `FAIL`.
The error handling behaviors can be configured when creating the instance of the sink, as shown in this snipped:
```java
PrometheusSink sink = PrometheusSink.builder()
// ...
.setErrorHandlingBehaviourConfiguration(SinkWriterErrorHandlingBehaviorConfiguration.builder()
.onPrometheusNonRetriableError(OnErrorBehavior.DISCARD_AND_CONTINUE)
.onMaxRetryExceeded(OnErrorBehavior.DISCARD_AND_CONTINUE)
.onHttpClientIOFail(OnErrorBehavior.DISCARD_AND_CONTINUE)
.build())
.build();
```
When configured for `DISCARD_AND_CONTINUE`, the sink will do the following:
1. Log a message at `WARN` level, with information about the problem and the number of time-series and samples dropped
2. Update the counters (see below)
3. Drop the offending write-request
4. Continue with the next request
Note that there is no partial-failure condition: the entire write-request is discarded regardless what data in the
request are causing the problem.
Prometheus does not return sufficient information to automatically handle partial requests.
> In the current connector version, `DISCARD_AND_CONTINUE` is the only supported behavior for non-retriable error.
>
> The behavior cannot be set to `FAIL`. Failing on non-retriable error would make impossible for the application to
> restart from checkpoint. The reason is that restarting from checkpoint cause some duplicates, that are rejected by
> Prometheus as out of order, causing in turn another non-retriable error, in an endless loop.
### Metrics
The sink exposes custom metrics, counting the samples and write-requests (batches) successfully written or discarded.
* `numSamplesOut` number of samples successfully written to Prometheus
* `numWriteRequestsOut` number of write-requests successfully written to Prometheus
* `numWriteRequestsRetries` number of write requests retried due to a retriable error (e.g. throttling)
* `numSamplesDropped` number of samples dropped, for any reasons
* `numSamplesNonRetriableDropped` (when `onPrometheusNonRetriableError` is set to `DISCARD_AND_CONTINUE`) number of
samples dropped due to non-retriable errors
* `numSamplesRetryLimitDropped` (when `onMaxRetryExceeded` is set to `DISCARD_AND_CONTINUE`) number of samples dropped
due to reaching the max number of retries
* `numWriteRequestsPermanentlyFailed` number of write requests permanently failed, due to any reasons (non retryable,
max nr of retries)
**Note**: the `numBytesOut` does not measure the number of bytes, due to an internal limitation of the base sink.
This metric should be ignored, and you should rely on `numSamplesOut` and `numWriteRequestsOut` instead.
#### Metrics scope
These custom metrics are exposed on partially customizable scope.
By default, the scope is `Sink__Writer.Prometheus`. It can be customized to any `Sink__Writer.<metric-group>`.
### Protobuf classes
The connector includes classes for Protobuf
objects, [Remote](src/main/java/org/apache/flink/connector/prometheus/sink/prometheus/Remote.java),
[Types](src/main/java/org/apache/flink/connector/prometheus/sink/prometheus/Types.java),
and [GoGoProtos](src/main/java/org/apache/flink/connector/prometheus/sink/protobuf/GoGoProtos.java).
The [*.proto](src/main/proto) files are provided for reference only. Protobuf binaries are required to re-generate the
Java classe.
Please refer to [Protobuf documentation](https://protobuf.dev/getting-started/javatutorial/#compiling-protocol-buffers).
### Example application
You can find a complete application example using the connector in
[DataStreamExample.java](src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java).