[FLINK-33493][connectors/elasticsearch] Fix ElasticsearchWriterITCase test
diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
index bd020fb..e8002cc 100644
--- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
+++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
@@ -24,11 +24,11 @@
 import org.apache.flink.connector.elasticsearch.test.DockerImageVersions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.metrics.testutils.MetricListener;
 import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -69,6 +69,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Consumer;
 
 import static org.apache.flink.connector.elasticsearch.sink.TestClientBase.DOCUMENT_TYPE;
 import static org.apache.flink.connector.elasticsearch.sink.TestClientBase.buildMessage;
@@ -193,15 +194,16 @@
         final String index = "test-inc-byte-out";
         final OperatorIOMetricGroup operatorIOMetricGroup =
                 UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
-        final InternalSinkWriterMetricGroup metricGroup =
-                InternalSinkWriterMetricGroup.mock(
-                        metricListener.getMetricGroup(), operatorIOMetricGroup);
         final int flushAfterNActions = 2;
         final BulkProcessorConfig bulkProcessorConfig =
                 new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0);
 
         try (final ElasticsearchWriter<Tuple2<Integer, String>> writer =
-                createWriter(index, false, bulkProcessorConfig, metricGroup)) {
+                createWriter(
+                        index,
+                        false,
+                        bulkProcessorConfig,
+                        getSinkWriterMetricGroup(operatorIOMetricGroup))) {
             final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter();
             assertThat(numBytesOut.getCount()).isZero();
             writer.write(Tuple2.of(1, buildMessage(1)), null);
@@ -267,10 +269,7 @@
     private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
             String index, boolean flushOnCheckpoint, BulkProcessorConfig bulkProcessorConfig) {
         return createWriter(
-                index,
-                flushOnCheckpoint,
-                bulkProcessorConfig,
-                InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()));
+                index, flushOnCheckpoint, bulkProcessorConfig, getSinkWriterMetricGroup());
     }
 
     private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
@@ -289,6 +288,40 @@
                 new TestMailbox());
     }
 
+    private TestingSinkWriterMetricGroup getSinkWriterMetricGroup() {
+        final OperatorIOMetricGroup operatorIOMetricGroup =
+                UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+        return getSinkWriterMetricGroup(operatorIOMetricGroup);
+    }
+
+    private TestingSinkWriterMetricGroup getSinkWriterMetricGroup(
+            OperatorIOMetricGroup operatorIOMetricGroup) {
+        MetricGroup parentMetricGroup = metricListener.getMetricGroup();
+        Counter numRecordsOutErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
+        Counter numRecordsSendErrors =
+                parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS, numRecordsOutErrors);
+        Counter numRecordsWritten =
+                parentMetricGroup.counter(
+                        MetricNames.NUM_RECORDS_SEND,
+                        operatorIOMetricGroup.getNumRecordsOutCounter());
+        Counter numBytesWritten =
+                parentMetricGroup.counter(
+                        MetricNames.NUM_BYTES_SEND, operatorIOMetricGroup.getNumBytesOutCounter());
+        Consumer<Gauge<Long>> currentSendTimeGaugeConsumer =
+                currentSendTimeGauge ->
+                        parentMetricGroup.gauge(
+                                MetricNames.CURRENT_SEND_TIME, currentSendTimeGauge);
+        return new TestingSinkWriterMetricGroup.Builder()
+                .setParentMetricGroup(parentMetricGroup)
+                .setIoMetricGroupSupplier(() -> operatorIOMetricGroup)
+                .setNumRecordsOutErrorsCounterSupplier(() -> numRecordsOutErrors)
+                .setNumRecordsSendErrorsCounterSupplier(() -> numRecordsSendErrors)
+                .setNumRecordsSendCounterSupplier(() -> numRecordsWritten)
+                .setNumBytesSendCounterSupplier(() -> numBytesWritten)
+                .setCurrentSendTimeGaugeConsumer(currentSendTimeGaugeConsumer)
+                .build();
+    }
+
     private static class TestBulkProcessorBuilderFactory implements BulkProcessorBuilderFactory {
         @Override
         public BulkProcessor.Builder apply(
diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestingSinkWriterMetricGroup.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestingSinkWriterMetricGroup.java
new file mode 100644
index 0000000..b122d66
--- /dev/null
+++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestingSinkWriterMetricGroup.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/** Testing implementation for {@link SinkWriterMetricGroup}. */
+public class TestingSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
+        implements SinkWriterMetricGroup {
+
+    private final Supplier<Counter> numRecordsOutErrorsCounterSupplier;
+
+    private final Supplier<Counter> numRecordsSendErrorsCounterSupplier;
+
+    private final Supplier<Counter> numRecordsSendCounterSupplier;
+
+    private final Supplier<Counter> numBytesSendCounterSupplier;
+
+    private final Consumer<Gauge<Long>> currentSendTimeGaugeConsumer;
+
+    private final Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier;
+
+    public TestingSinkWriterMetricGroup(
+            MetricGroup parentMetricGroup,
+            Supplier<Counter> numRecordsOutErrorsCounterSupplier,
+            Supplier<Counter> numRecordsSendErrorsCounterSupplier,
+            Supplier<Counter> numRecordsSendCounterSupplier,
+            Supplier<Counter> numBytesSendCounterSupplier,
+            Consumer<Gauge<Long>> currentSendTimeGaugeConsumer,
+            Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier) {
+        super(parentMetricGroup);
+        this.numRecordsOutErrorsCounterSupplier = numRecordsOutErrorsCounterSupplier;
+        this.numRecordsSendErrorsCounterSupplier = numRecordsSendErrorsCounterSupplier;
+        this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier;
+        this.numBytesSendCounterSupplier = numBytesSendCounterSupplier;
+        this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer;
+        this.ioMetricGroupSupplier = ioMetricGroupSupplier;
+    }
+
+    @Override
+    public Counter getNumRecordsOutErrorsCounter() {
+        return numRecordsOutErrorsCounterSupplier.get();
+    }
+
+    @Override
+    public Counter getNumRecordsSendErrorsCounter() {
+        return numRecordsSendErrorsCounterSupplier.get();
+    }
+
+    @Override
+    public Counter getNumRecordsSendCounter() {
+        return numRecordsSendCounterSupplier.get();
+    }
+
+    @Override
+    public Counter getNumBytesSendCounter() {
+        return numBytesSendCounterSupplier.get();
+    }
+
+    @Override
+    public void setCurrentSendTimeGauge(Gauge<Long> gauge) {
+        currentSendTimeGaugeConsumer.accept(gauge);
+    }
+
+    @Override
+    public OperatorIOMetricGroup getIOMetricGroup() {
+        return ioMetricGroupSupplier.get();
+    }
+
+    /** Builder for {@link TestingSinkWriterMetricGroup}. */
+    public static class Builder {
+
+        private MetricGroup parentMetricGroup = null;
+
+        private Supplier<Counter> numRecordsOutErrorsCounterSupplier = () -> null;
+
+        private Supplier<Counter> numRecordsSendErrorsCounterSupplier = () -> null;
+
+        private Supplier<Counter> numRecordsSendCounterSupplier = () -> null;
+
+        private Supplier<Counter> numBytesSendCounterSupplier = () -> null;
+
+        private Consumer<Gauge<Long>> currentSendTimeGaugeConsumer = counter -> {};
+
+        private Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier = () -> null;
+
+        public Builder setParentMetricGroup(MetricGroup parentMetricGroup) {
+            this.parentMetricGroup = parentMetricGroup;
+            return this;
+        }
+
+        public Builder setNumRecordsOutErrorsCounterSupplier(
+                Supplier<Counter> numRecordsOutErrorsCounterSupplier) {
+            this.numRecordsOutErrorsCounterSupplier = numRecordsOutErrorsCounterSupplier;
+            return this;
+        }
+
+        public Builder setNumRecordsSendErrorsCounterSupplier(
+                Supplier<Counter> numRecordsSendErrorsCounterSupplier) {
+            this.numRecordsSendErrorsCounterSupplier = numRecordsSendErrorsCounterSupplier;
+            return this;
+        }
+
+        public Builder setNumRecordsSendCounterSupplier(
+                Supplier<Counter> numRecordsSendCounterSupplier) {
+            this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier;
+            return this;
+        }
+
+        public Builder setNumBytesSendCounterSupplier(
+                Supplier<Counter> numBytesSendCounterSupplier) {
+            this.numBytesSendCounterSupplier = numBytesSendCounterSupplier;
+            return this;
+        }
+
+        public Builder setCurrentSendTimeGaugeConsumer(
+                Consumer<Gauge<Long>> currentSendTimeGaugeConsumer) {
+            this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer;
+            return this;
+        }
+
+        public Builder setIoMetricGroupSupplier(
+                Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier) {
+            this.ioMetricGroupSupplier = ioMetricGroupSupplier;
+            return this;
+        }
+
+        public TestingSinkWriterMetricGroup build() {
+            return new TestingSinkWriterMetricGroup(
+                    parentMetricGroup,
+                    numRecordsOutErrorsCounterSupplier,
+                    numRecordsSendErrorsCounterSupplier,
+                    numRecordsSendCounterSupplier,
+                    numBytesSendCounterSupplier,
+                    currentSendTimeGaugeConsumer,
+                    ioMetricGroupSupplier);
+        }
+    }
+}