[FLINK-35504] Improve Elasticsearch 8 connector observability
diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java
index a3eb8df..1163bf2 100644
--- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java
+++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java
@@ -62,6 +62,13 @@
     private boolean close = false;
 
     private final Counter numRecordsOutErrorsCounter;
+    /**
+     * A counter to track number of records that are returned by Elasticsearch as failed and then
+     * retried by this writer.
+     */
+    private final Counter numRecordsSendPartialFailureCounter;
+    /** A counter to track the number of bulk requests that are sent to Elasticsearch. */
+    private final Counter numRequestSubmittedCounter;
 
     private static final FatalExceptionClassifier ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER =
             FatalExceptionClassifier.createChain(
@@ -103,11 +110,15 @@
         checkNotNull(metricGroup);
 
         this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
+        this.numRecordsSendPartialFailureCounter =
+                metricGroup.counter("numRecordsSendPartialFailure");
+        this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted");
     }
 
     @Override
     protected void submitRequestEntries(
             List<Operation> requestEntries, Consumer<List<Operation>> requestResult) {
+        numRequestSubmittedCounter.inc();
         LOG.debug("submitRequestEntries with {} items", requestEntries.size());
 
         BulkRequest.Builder br = new BulkRequest.Builder();
@@ -133,7 +144,11 @@
             List<Operation> requestEntries,
             Consumer<List<Operation>> requestResult,
             Throwable error) {
-        LOG.debug("The BulkRequest of {} operation(s) has failed.", requestEntries.size());
+        LOG.warn(
+                "The BulkRequest of {} operation(s) has failed due to: {}",
+                requestEntries.size(),
+                error.getMessage());
+        LOG.debug("The BulkRequest has failed", error);
         numRecordsOutErrorsCounter.inc(requestEntries.size());
 
         if (isRetryable(error.getCause())) {
@@ -145,15 +160,17 @@
             List<Operation> requestEntries,
             Consumer<List<Operation>> requestResult,
             BulkResponse response) {
+        LOG.debug("The BulkRequest has failed partially. Response: {}", response);
         ArrayList<Operation> failedItems = new ArrayList<>();
         for (int i = 0; i < response.items().size(); i++) {
             if (response.items().get(i).error() != null) {
-                numRecordsOutErrorsCounter.inc();
                 failedItems.add(requestEntries.get(i));
             }
         }
 
-        LOG.debug(
+        numRecordsOutErrorsCounter.inc(failedItems.size());
+        numRecordsSendPartialFailureCounter.inc(failedItems.size());
+        LOG.info(
                 "The BulkRequest with {} operation(s) has {} failure(s). It took {}ms",
                 requestEntries.size(),
                 failedItems.size(),