blob: 5b2b16a7c2efc922c65afbcc05849f50665179ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
/**
* AbstractStreamOperatorWithMetrics wraps the original output of the AbstractStreamOperator in a
* special output to collect metrics of operator in runtime, and accumulates the metrics when close
* operator.
*
* @param <OUT> The output type of the operator.
*/
public class AbstractStreamOperatorWithMetrics<OUT> extends AbstractStreamOperator<OUT> {
public static final String ACCUMULATOR_PREFIX = "OperatorMetric_";
public static final String ROW_COUNT_METRICS = "rowCount";
public static final String METRICS_CONF_KEY = "operatorMetricCollect";
public static final String METRICS_CONF_VALUE = "true";
private Counter counter;
protected boolean closed = false;
private transient Configuration sqlConf;
public AbstractStreamOperatorWithMetrics() {
setChainingStrategy(ChainingStrategy.ALWAYS);
}
// id of the rel who use this operator.
private int relID;
@Override
public void open() throws Exception {
super.open();
Output<StreamRecord<OUT>> rawOutput = this.output;
if (isCollectMetricEnabled()) {
counter = getMetricGroup().counter(ROW_COUNT_METRICS);
this.output = new Output<StreamRecord<OUT>>() {
@Override
public void collect(StreamRecord<OUT> record) {
// TODO: 1. suppose one record is one row here , does not take
// VectorizedColumnBatchTypeInfo into consideration yet.
// TODO: 2. only collect row counter here, does not collect other stats yet.
counter.inc();
rawOutput.collect(record);
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
counter.inc();
rawOutput.collect(outputTag, record);
}
@Override
public void close() {
rawOutput.close();
}
@Override
public void emitWatermark(Watermark mark) {
rawOutput.emitWatermark(mark);
}
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
rawOutput.emitLatencyMarker(latencyMarker);
}
};
}
}
@Override
public void close() throws Exception {
if (isCollectMetricEnabled()) {
LongCounter rowCountAcc = new LongCounter(counter.getCount());
String rowCountAccName = ACCUMULATOR_PREFIX + getOperatorConfig().getVertexID();
getRuntimeContext().addAccumulator(rowCountAccName, rowCountAcc);
}
super.close();
closed = true;
}
@Override
public void dispose() throws Exception {
if (!closed) {
close();
}
super.dispose();
}
private boolean isCollectMetricEnabled() {
ExecutionConfig.GlobalJobParameters parameters = getExecutionConfig()
.getGlobalJobParameters();
if (parameters != null) {
String metricsConfig = parameters.toMap().get(METRICS_CONF_KEY);
return metricsConfig != null && metricsConfig
.equalsIgnoreCase(METRICS_CONF_VALUE);
} else {
return false;
}
}
public Configuration getSqlConf() {
if (sqlConf != null) {
return sqlConf;
}
Configuration conf = getContainingTask().getJobConfiguration();
ExecutionConfig.GlobalJobParameters paras = getExecutionConfig().getGlobalJobParameters();
if (paras != null) {
conf.addAll(paras.toMap());
}
this.sqlConf = conf;
return conf;
}
}