[FLINK-35504] Improve Elasticsearch 8 connector observability
diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java
index a3eb8df..1163bf2 100644
--- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java
+++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java
@@ -62,6 +62,13 @@
private boolean close = false;
private final Counter numRecordsOutErrorsCounter;
+ /**
+ * A counter to track number of records that are returned by Elasticsearch as failed and then
+ * retried by this writer.
+ */
+ private final Counter numRecordsSendPartialFailureCounter;
+ /** A counter to track the number of bulk requests that are sent to Elasticsearch. */
+ private final Counter numRequestSubmittedCounter;
private static final FatalExceptionClassifier ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER =
FatalExceptionClassifier.createChain(
@@ -103,11 +110,15 @@
checkNotNull(metricGroup);
this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
+ this.numRecordsSendPartialFailureCounter =
+ metricGroup.counter("numRecordsSendPartialFailure");
+ this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted");
}
@Override
protected void submitRequestEntries(
List<Operation> requestEntries, Consumer<List<Operation>> requestResult) {
+ numRequestSubmittedCounter.inc();
LOG.debug("submitRequestEntries with {} items", requestEntries.size());
BulkRequest.Builder br = new BulkRequest.Builder();
@@ -133,7 +144,11 @@
List<Operation> requestEntries,
Consumer<List<Operation>> requestResult,
Throwable error) {
- LOG.debug("The BulkRequest of {} operation(s) has failed.", requestEntries.size());
+ LOG.warn(
+ "The BulkRequest of {} operation(s) has failed due to: {}",
+ requestEntries.size(),
+ error.getMessage());
+ LOG.debug("The BulkRequest has failed", error);
numRecordsOutErrorsCounter.inc(requestEntries.size());
if (isRetryable(error.getCause())) {
@@ -145,15 +160,17 @@
List<Operation> requestEntries,
Consumer<List<Operation>> requestResult,
BulkResponse response) {
+ LOG.debug("The BulkRequest has failed partially. Response: {}", response);
ArrayList<Operation> failedItems = new ArrayList<>();
for (int i = 0; i < response.items().size(); i++) {
if (response.items().get(i).error() != null) {
- numRecordsOutErrorsCounter.inc();
failedItems.add(requestEntries.get(i));
}
}
- LOG.debug(
+ numRecordsOutErrorsCounter.inc(failedItems.size());
+ numRecordsSendPartialFailureCounter.inc(failedItems.size());
+ LOG.info(
"The BulkRequest with {} operation(s) has {} failure(s). It took {}ms",
requestEntries.size(),
failedItems.size(),