blob: d60d551e17ddff93938cf4d33a86aa11307f8dd3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.sink.writer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.doris.flink.rest.models.RespContent;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
/**
* the metrics for Doris Writer.
*
* @since 1.6
*/
public class DorisWriteMetrics implements Serializable {
private static final long serialVersionUID = 1L;
// Window size of histogram metrics.
private static final int HISTOGRAM_WINDOW_SIZE = 100;
private static final List<String> DORIS_SUCCESS_STATUS =
new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
private final String tableIdentifier;
AtomicLong totalRows = new AtomicLong();
// Counter
private transient Counter totalFlushLoadBytes;
private transient Counter totalFlushNumberTotalRows;
private transient Counter totalFlushLoadedRows;
private transient Counter totalFlushTimeMs;
private transient Counter totalFlushSucceededTimes;
private transient Counter totalFlushFailedTimes;
private transient Counter totalFlushFilteredRows;
private transient Counter totalFlushUnselectedRows;
// Histogram
private transient Histogram beginTxnTimeHistogramMs;
private transient Histogram commitAndPublishTimeHistogramMs;
private transient Histogram streamLoadPutTimeHistogramMs;
private transient Histogram readDataTimeHistogramMs;
private transient Histogram writeDataTimeHistogramMs;
private transient Histogram loadTimeHistogramMs;
private static final String COUNTER_TOTAL_FLUSH_BYTES = "totalFlushLoadBytes";
private static final String COUNTER_TOTAL_FLUSH_ROWS = "flushTotalNumberRows";
private static final String COUNTER_TOTAL_FLUSH_LOADED_ROWS = "totalFlushLoadedRows";
private static final String COUNTER_TOTAL_FLUSH_COST_TIME = "totalFlushTimeMs";
private static final String COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES_COUNT =
"totalFlushSucceededNumber";
private static final String COUNTER_TOTAL_FLUSH_FAILED_TIMES_COUNT = "totalFlushFailedNumber";
private static final String COUNTER_TOTAL_FILTERED_ROWS = "totalFlushFilteredRows";
private static final String COUNTER_TOTAL_UNSELECTED_ROWS = "totalFlushUnselectedRows";
private static final String HISTOGRAM_BEGIN_TXN_TIME_MS = "beginTxnTimeMs";
private static final String HISTOGRAM_STREAM_LOAD_PUT_DATA_TIME_MS = "putDataTimeMs";
private static final String HISTOGRAM_READ_DATA_TIME_MS = "readDataTimeMs";
private static final String HISTOGRAM_WRITE_DATA_TIME_MS = "writeDataTimeMs";
private static final String HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS = "commitAndPublishTimeMs";
private static final String HISTOGRAM_LOAD_TIME_MS = "loadTimeMs";
private static final String METRIC_NAME_FORMAT = "%s_%s";
@VisibleForTesting
DorisWriteMetrics(SinkWriterMetricGroup sinkMetricGroup, String tableIdentifier) {
this.tableIdentifier = tableIdentifier;
register(sinkMetricGroup);
}
public static DorisWriteMetrics of(
SinkWriterMetricGroup sinkWriterMetricGroup, String tableIdentifier) {
return new DorisWriteMetrics(sinkWriterMetricGroup, tableIdentifier);
}
public void flush(RespContent respContent) {
if (respContent != null && DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
flushSuccessLoad(respContent);
} else {
flushFailedRecord();
}
}
@VisibleForTesting
public void register(SinkWriterMetricGroup sinkMetricGroup) {
totalFlushNumberTotalRows =
sinkMetricGroup.counter(
String.format(
METRIC_NAME_FORMAT, tableIdentifier, COUNTER_TOTAL_FLUSH_ROWS));
totalFlushLoadedRows =
sinkMetricGroup.counter(
String.format(
METRIC_NAME_FORMAT,
tableIdentifier,
COUNTER_TOTAL_FLUSH_LOADED_ROWS));
totalFlushLoadBytes =
sinkMetricGroup.counter(
String.format(
METRIC_NAME_FORMAT, tableIdentifier, COUNTER_TOTAL_FLUSH_BYTES));
totalFlushFilteredRows =
sinkMetricGroup.counter(
String.format(
METRIC_NAME_FORMAT, tableIdentifier, COUNTER_TOTAL_FILTERED_ROWS));
totalFlushUnselectedRows =
sinkMetricGroup.counter(
String.format(
METRIC_NAME_FORMAT,
tableIdentifier,
COUNTER_TOTAL_UNSELECTED_ROWS));
totalFlushSucceededTimes =
sinkMetricGroup.counter(
String.format(
METRIC_NAME_FORMAT,
tableIdentifier,
COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES_COUNT));
totalFlushFailedTimes =
sinkMetricGroup.counter(
String.format(
METRIC_NAME_FORMAT,
tableIdentifier,
COUNTER_TOTAL_FLUSH_FAILED_TIMES_COUNT));
totalFlushTimeMs =
sinkMetricGroup.counter(
String.format(
METRIC_NAME_FORMAT,
tableIdentifier,
COUNTER_TOTAL_FLUSH_COST_TIME));
loadTimeHistogramMs =
sinkMetricGroup.histogram(
String.format(METRIC_NAME_FORMAT, tableIdentifier, HISTOGRAM_LOAD_TIME_MS),
new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
streamLoadPutTimeHistogramMs =
sinkMetricGroup.histogram(
String.format(
METRIC_NAME_FORMAT,
tableIdentifier,
HISTOGRAM_STREAM_LOAD_PUT_DATA_TIME_MS),
new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
commitAndPublishTimeHistogramMs =
sinkMetricGroup.histogram(
String.format(
METRIC_NAME_FORMAT,
tableIdentifier,
HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS),
new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
this.beginTxnTimeHistogramMs =
sinkMetricGroup.histogram(
String.format(
METRIC_NAME_FORMAT, tableIdentifier, HISTOGRAM_BEGIN_TXN_TIME_MS),
new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
readDataTimeHistogramMs =
sinkMetricGroup.histogram(
String.format(
METRIC_NAME_FORMAT, tableIdentifier, HISTOGRAM_READ_DATA_TIME_MS),
new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
writeDataTimeHistogramMs =
sinkMetricGroup.histogram(
String.format(
METRIC_NAME_FORMAT, tableIdentifier, HISTOGRAM_WRITE_DATA_TIME_MS),
new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE));
}
private void flushSuccessLoad(RespContent responseContent) {
Optional.ofNullable(responseContent.getLoadBytes()).ifPresent(totalFlushLoadBytes::inc);
Optional.ofNullable(responseContent.getNumberLoadedRows())
.ifPresent(totalFlushLoadedRows::inc);
Optional.ofNullable(responseContent.getNumberTotalRows())
.ifPresent(totalFlushNumberTotalRows::inc);
Optional.ofNullable(responseContent.getNumberFilteredRows())
.ifPresent(totalFlushFilteredRows::inc);
Optional.ofNullable(responseContent.getLoadTimeMs()).ifPresent(totalFlushTimeMs::inc);
Optional.ofNullable(responseContent.getNumberUnselectedRows())
.ifPresent(totalFlushUnselectedRows::inc);
totalFlushSucceededTimes.inc();
Optional.ofNullable(responseContent.getCommitAndPublishTimeMs())
.ifPresent(commitAndPublishTimeHistogramMs::update);
Optional.ofNullable(responseContent.getWriteDataTimeMs())
.ifPresent(writeDataTimeHistogramMs::update);
Optional.ofNullable(responseContent.getBeginTxnTimeMs())
.ifPresent(beginTxnTimeHistogramMs::update);
Optional.ofNullable(responseContent.getReadDataTimeMs())
.ifPresent(readDataTimeHistogramMs::update);
Optional.ofNullable(responseContent.getStreamLoadPutTimeMs())
.ifPresent(streamLoadPutTimeHistogramMs::update);
Optional.ofNullable(responseContent.getLoadTimeMs()).ifPresent(loadTimeHistogramMs::update);
}
private void flushFailedRecord() {
totalFlushFailedTimes.inc();
}
public String getTableIdentifier() {
return tableIdentifier;
}
public Counter getTotalFlushLoadBytes() {
return totalFlushLoadBytes;
}
public Counter getTotalFlushNumberTotalRows() {
return totalFlushNumberTotalRows;
}
public Counter getTotalFlushLoadedRows() {
return totalFlushLoadedRows;
}
public Counter getTotalFlushTimeMs() {
return totalFlushTimeMs;
}
public Counter getTotalFlushSucceededTimes() {
return totalFlushSucceededTimes;
}
public Counter getTotalFlushFailedTimes() {
return totalFlushFailedTimes;
}
public Counter getTotalFlushFilteredRows() {
return totalFlushFilteredRows;
}
public Counter getTotalFlushUnselectedRows() {
return totalFlushUnselectedRows;
}
public Histogram getBeginTxnTimeHistogramMs() {
return beginTxnTimeHistogramMs;
}
public Histogram getCommitAndPublishTimeHistogramMs() {
return commitAndPublishTimeHistogramMs;
}
public Histogram getStreamLoadPutTimeHistogramMs() {
return streamLoadPutTimeHistogramMs;
}
public Histogram getReadDataTimeHistogramMs() {
return readDataTimeHistogramMs;
}
public Histogram getWriteDataTimeHistogramMs() {
return writeDataTimeHistogramMs;
}
public Histogram getLoadTimeHistogramMs() {
return loadTimeHistogramMs;
}
@VisibleForTesting
public void setTotalFlushLoadBytes(Counter totalFlushLoadBytes) {
this.totalFlushLoadBytes = totalFlushLoadBytes;
}
@VisibleForTesting
public void setTotalFlushNumberTotalRows(Counter totalFlushNumberTotalRows) {
this.totalFlushNumberTotalRows = totalFlushNumberTotalRows;
}
@VisibleForTesting
public void setTotalFlushLoadedRows(Counter totalFlushLoadedRows) {
this.totalFlushLoadedRows = totalFlushLoadedRows;
}
@VisibleForTesting
public void setTotalFlushTimeMs(Counter totalFlushTimeMs) {
this.totalFlushTimeMs = totalFlushTimeMs;
}
@VisibleForTesting
public void setTotalFlushSucceededTimes(Counter totalFlushSucceededTimes) {
this.totalFlushSucceededTimes = totalFlushSucceededTimes;
}
@VisibleForTesting
public void setTotalFlushFailedTimes(Counter totalFlushFailedTimes) {
this.totalFlushFailedTimes = totalFlushFailedTimes;
}
@VisibleForTesting
public void setTotalFlushFilteredRows(Counter totalFlushFilteredRows) {
this.totalFlushFilteredRows = totalFlushFilteredRows;
}
@VisibleForTesting
public void setTotalFlushUnselectedRows(Counter totalFlushUnselectedRows) {
this.totalFlushUnselectedRows = totalFlushUnselectedRows;
}
@VisibleForTesting
public void setBeginTxnTimeHistogramMs(Histogram beginTxnTimeHistogramMs) {
this.beginTxnTimeHistogramMs = beginTxnTimeHistogramMs;
}
@VisibleForTesting
public void setCommitAndPublishTimeHistogramMs(Histogram commitAndPublishTimeHistogramMs) {
this.commitAndPublishTimeHistogramMs = commitAndPublishTimeHistogramMs;
}
@VisibleForTesting
public void setStreamLoadPutTimeHistogramMs(Histogram streamLoadPutTimeHistogramMs) {
this.streamLoadPutTimeHistogramMs = streamLoadPutTimeHistogramMs;
}
@VisibleForTesting
public void setReadDataTimeHistogramMs(Histogram readDataTimeHistogramMs) {
this.readDataTimeHistogramMs = readDataTimeHistogramMs;
}
@VisibleForTesting
public void setWriteDataTimeHistogramMs(Histogram writeDataTimeHistogramMs) {
this.writeDataTimeHistogramMs = writeDataTimeHistogramMs;
}
@VisibleForTesting
public void setLoadTimeHistogramMs(Histogram loadTimeHistogramMs) {
this.loadTimeHistogramMs = loadTimeHistogramMs;
}
}