[FLINK-33493][connectors/elasticsearch] Fix ElasticsearchWriterITCase test
diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
index bd020fb..e8002cc 100644
--- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
+++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
@@ -24,11 +24,11 @@
import org.apache.flink.connector.elasticsearch.test.DockerImageVersions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -69,6 +69,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Consumer;
import static org.apache.flink.connector.elasticsearch.sink.TestClientBase.DOCUMENT_TYPE;
import static org.apache.flink.connector.elasticsearch.sink.TestClientBase.buildMessage;
@@ -193,15 +194,16 @@
final String index = "test-inc-byte-out";
final OperatorIOMetricGroup operatorIOMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
- final InternalSinkWriterMetricGroup metricGroup =
- InternalSinkWriterMetricGroup.mock(
- metricListener.getMetricGroup(), operatorIOMetricGroup);
final int flushAfterNActions = 2;
final BulkProcessorConfig bulkProcessorConfig =
new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0);
try (final ElasticsearchWriter<Tuple2<Integer, String>> writer =
- createWriter(index, false, bulkProcessorConfig, metricGroup)) {
+ createWriter(
+ index,
+ false,
+ bulkProcessorConfig,
+ getSinkWriterMetricGroup(operatorIOMetricGroup))) {
final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter();
assertThat(numBytesOut.getCount()).isZero();
writer.write(Tuple2.of(1, buildMessage(1)), null);
@@ -267,10 +269,7 @@
private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
String index, boolean flushOnCheckpoint, BulkProcessorConfig bulkProcessorConfig) {
return createWriter(
- index,
- flushOnCheckpoint,
- bulkProcessorConfig,
- InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()));
+ index, flushOnCheckpoint, bulkProcessorConfig, getSinkWriterMetricGroup());
}
private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
@@ -289,6 +288,40 @@
new TestMailbox());
}
+ private TestingSinkWriterMetricGroup getSinkWriterMetricGroup() {
+ final OperatorIOMetricGroup operatorIOMetricGroup =
+ UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+ return getSinkWriterMetricGroup(operatorIOMetricGroup);
+ }
+
+ private TestingSinkWriterMetricGroup getSinkWriterMetricGroup(
+ OperatorIOMetricGroup operatorIOMetricGroup) {
+ MetricGroup parentMetricGroup = metricListener.getMetricGroup();
+ Counter numRecordsOutErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
+ Counter numRecordsSendErrors =
+ parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS, numRecordsOutErrors);
+ Counter numRecordsWritten =
+ parentMetricGroup.counter(
+ MetricNames.NUM_RECORDS_SEND,
+ operatorIOMetricGroup.getNumRecordsOutCounter());
+ Counter numBytesWritten =
+ parentMetricGroup.counter(
+ MetricNames.NUM_BYTES_SEND, operatorIOMetricGroup.getNumBytesOutCounter());
+ Consumer<Gauge<Long>> currentSendTimeGaugeConsumer =
+ currentSendTimeGauge ->
+ parentMetricGroup.gauge(
+ MetricNames.CURRENT_SEND_TIME, currentSendTimeGauge);
+ return new TestingSinkWriterMetricGroup.Builder()
+ .setParentMetricGroup(parentMetricGroup)
+ .setIoMetricGroupSupplier(() -> operatorIOMetricGroup)
+ .setNumRecordsOutErrorsCounterSupplier(() -> numRecordsOutErrors)
+ .setNumRecordsSendErrorsCounterSupplier(() -> numRecordsSendErrors)
+ .setNumRecordsSendCounterSupplier(() -> numRecordsWritten)
+ .setNumBytesSendCounterSupplier(() -> numBytesWritten)
+ .setCurrentSendTimeGaugeConsumer(currentSendTimeGaugeConsumer)
+ .build();
+ }
+
private static class TestBulkProcessorBuilderFactory implements BulkProcessorBuilderFactory {
@Override
public BulkProcessor.Builder apply(
diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestingSinkWriterMetricGroup.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestingSinkWriterMetricGroup.java
new file mode 100644
index 0000000..b122d66
--- /dev/null
+++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/TestingSinkWriterMetricGroup.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/** Testing implementation for {@link SinkWriterMetricGroup}. */
+public class TestingSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
+ implements SinkWriterMetricGroup {
+
+ private final Supplier<Counter> numRecordsOutErrorsCounterSupplier;
+
+ private final Supplier<Counter> numRecordsSendErrorsCounterSupplier;
+
+ private final Supplier<Counter> numRecordsSendCounterSupplier;
+
+ private final Supplier<Counter> numBytesSendCounterSupplier;
+
+ private final Consumer<Gauge<Long>> currentSendTimeGaugeConsumer;
+
+ private final Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier;
+
+ public TestingSinkWriterMetricGroup(
+ MetricGroup parentMetricGroup,
+ Supplier<Counter> numRecordsOutErrorsCounterSupplier,
+ Supplier<Counter> numRecordsSendErrorsCounterSupplier,
+ Supplier<Counter> numRecordsSendCounterSupplier,
+ Supplier<Counter> numBytesSendCounterSupplier,
+ Consumer<Gauge<Long>> currentSendTimeGaugeConsumer,
+ Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier) {
+ super(parentMetricGroup);
+ this.numRecordsOutErrorsCounterSupplier = numRecordsOutErrorsCounterSupplier;
+ this.numRecordsSendErrorsCounterSupplier = numRecordsSendErrorsCounterSupplier;
+ this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier;
+ this.numBytesSendCounterSupplier = numBytesSendCounterSupplier;
+ this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer;
+ this.ioMetricGroupSupplier = ioMetricGroupSupplier;
+ }
+
+ @Override
+ public Counter getNumRecordsOutErrorsCounter() {
+ return numRecordsOutErrorsCounterSupplier.get();
+ }
+
+ @Override
+ public Counter getNumRecordsSendErrorsCounter() {
+ return numRecordsSendErrorsCounterSupplier.get();
+ }
+
+ @Override
+ public Counter getNumRecordsSendCounter() {
+ return numRecordsSendCounterSupplier.get();
+ }
+
+ @Override
+ public Counter getNumBytesSendCounter() {
+ return numBytesSendCounterSupplier.get();
+ }
+
+ @Override
+ public void setCurrentSendTimeGauge(Gauge<Long> gauge) {
+ currentSendTimeGaugeConsumer.accept(gauge);
+ }
+
+ @Override
+ public OperatorIOMetricGroup getIOMetricGroup() {
+ return ioMetricGroupSupplier.get();
+ }
+
+ /** Builder for {@link TestingSinkWriterMetricGroup}. */
+ public static class Builder {
+
+ private MetricGroup parentMetricGroup = null;
+
+ private Supplier<Counter> numRecordsOutErrorsCounterSupplier = () -> null;
+
+ private Supplier<Counter> numRecordsSendErrorsCounterSupplier = () -> null;
+
+ private Supplier<Counter> numRecordsSendCounterSupplier = () -> null;
+
+ private Supplier<Counter> numBytesSendCounterSupplier = () -> null;
+
+ private Consumer<Gauge<Long>> currentSendTimeGaugeConsumer = counter -> {};
+
+ private Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier = () -> null;
+
+ public Builder setParentMetricGroup(MetricGroup parentMetricGroup) {
+ this.parentMetricGroup = parentMetricGroup;
+ return this;
+ }
+
+ public Builder setNumRecordsOutErrorsCounterSupplier(
+ Supplier<Counter> numRecordsOutErrorsCounterSupplier) {
+ this.numRecordsOutErrorsCounterSupplier = numRecordsOutErrorsCounterSupplier;
+ return this;
+ }
+
+ public Builder setNumRecordsSendErrorsCounterSupplier(
+ Supplier<Counter> numRecordsSendErrorsCounterSupplier) {
+ this.numRecordsSendErrorsCounterSupplier = numRecordsSendErrorsCounterSupplier;
+ return this;
+ }
+
+ public Builder setNumRecordsSendCounterSupplier(
+ Supplier<Counter> numRecordsSendCounterSupplier) {
+ this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier;
+ return this;
+ }
+
+ public Builder setNumBytesSendCounterSupplier(
+ Supplier<Counter> numBytesSendCounterSupplier) {
+ this.numBytesSendCounterSupplier = numBytesSendCounterSupplier;
+ return this;
+ }
+
+ public Builder setCurrentSendTimeGaugeConsumer(
+ Consumer<Gauge<Long>> currentSendTimeGaugeConsumer) {
+ this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer;
+ return this;
+ }
+
+ public Builder setIoMetricGroupSupplier(
+ Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier) {
+ this.ioMetricGroupSupplier = ioMetricGroupSupplier;
+ return this;
+ }
+
+ public TestingSinkWriterMetricGroup build() {
+ return new TestingSinkWriterMetricGroup(
+ parentMetricGroup,
+ numRecordsOutErrorsCounterSupplier,
+ numRecordsSendErrorsCounterSupplier,
+ numRecordsSendCounterSupplier,
+ numBytesSendCounterSupplier,
+ currentSendTimeGaugeConsumer,
+ ioMetricGroupSupplier);
+ }
+ }
+}