MetricsFilter implements BaseFilter.Listener (#10589)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/MetricsConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/MetricsConstants.java
index 74a4fe4..04bb400 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/MetricsConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/MetricsConstants.java
@@ -73,4 +73,6 @@
int PROMETHEUS_DEFAULT_PUSH_INTERVAL = 30;
String PROMETHEUS_DEFAULT_JOB_NAME = "default_dubbo_job";
+
+ String METRIC_FILTER_START_TIME = "metric_filter_start_time";
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/DefaultMetricsCollector.java b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/DefaultMetricsCollector.java
index 599b6c8..6e5ba2d 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/DefaultMetricsCollector.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/DefaultMetricsCollector.java
@@ -17,27 +17,26 @@
package org.apache.dubbo.common.metrics.collector;
-import org.apache.dubbo.common.metrics.event.MetricsEvent;
-import org.apache.dubbo.common.metrics.event.RTEvent;
+import static org.apache.dubbo.common.metrics.model.MetricsCategory.REQUESTS;
+import static org.apache.dubbo.common.metrics.model.MetricsCategory.RT;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.dubbo.common.metrics.collector.stat.MetricsStatComposite;
+import org.apache.dubbo.common.metrics.collector.stat.MetricsStatHandler;
import org.apache.dubbo.common.metrics.event.RequestEvent;
import org.apache.dubbo.common.metrics.listener.MetricsListener;
-import org.apache.dubbo.common.metrics.model.MethodMetric;
import org.apache.dubbo.common.metrics.model.MetricsKey;
import org.apache.dubbo.common.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.common.metrics.model.sample.MetricSample;
import org.apache.dubbo.rpc.model.ApplicationModel;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAccumulator;
-
-import static org.apache.dubbo.common.metrics.model.MetricsCategory.REQUESTS;
-import static org.apache.dubbo.common.metrics.model.MetricsCategory.RT;
-
/**
* Default implementation of {@link MetricsCollector}
*/
@@ -46,23 +45,11 @@
private AtomicBoolean collectEnabled = new AtomicBoolean(false);
private final List<MetricsListener> listeners = new ArrayList<>();
private final ApplicationModel applicationModel;
- private final String applicationName;
-
- private final Map<MethodMetric, AtomicLong> totalRequests = new ConcurrentHashMap<>();
- private final Map<MethodMetric, AtomicLong> succeedRequests = new ConcurrentHashMap<>();
- private final Map<MethodMetric, AtomicLong> failedRequests = new ConcurrentHashMap<>();
- private final Map<MethodMetric, AtomicLong> processingRequests = new ConcurrentHashMap<>();
-
- private final Map<MethodMetric, AtomicLong> lastRT = new ConcurrentHashMap<>();
- private final Map<MethodMetric, LongAccumulator> minRT = new ConcurrentHashMap<>();
- private final Map<MethodMetric, LongAccumulator> maxRT = new ConcurrentHashMap<>();
- private final Map<MethodMetric, AtomicLong> avgRT = new ConcurrentHashMap<>();
- private final Map<MethodMetric, AtomicLong> totalRT = new ConcurrentHashMap<>();
- private final Map<MethodMetric, AtomicLong> rtCount = new ConcurrentHashMap<>();
+ private final MetricsStatComposite stats;
public DefaultMetricsCollector(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
- this.applicationName = applicationModel.getApplicationName();
+ this.stats = new MetricsStatComposite(applicationModel.getApplicationName(), this);
}
public void setCollectEnabled(Boolean collectEnabled) {
@@ -77,81 +64,51 @@
listeners.add(listener);
}
- public void increaseTotalRequests(String interfaceName, String methodName, String group, String version) {
- if (isCollectEnabled()) {
- MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
- AtomicLong count = totalRequests.computeIfAbsent(metric, k -> new AtomicLong(0L));
- count.incrementAndGet();
+ public List<MetricsListener> getListener() {
+ return this.listeners;
+ }
- publishEvent(new RequestEvent(metric, RequestEvent.Type.TOTAL));
- }
+ public void increaseTotalRequests(String interfaceName, String methodName, String group, String version) {
+ doExecute(RequestEvent.Type.TOTAL,statHandler-> {
+ statHandler.increase(interfaceName, methodName, group, version);
+ });
}
public void increaseSucceedRequests(String interfaceName, String methodName, String group, String version) {
- if (isCollectEnabled()) {
- MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
- AtomicLong count = succeedRequests.computeIfAbsent(metric, k -> new AtomicLong(0L));
- count.incrementAndGet();
-
- publishEvent(new RequestEvent(metric, RequestEvent.Type.SUCCEED));
- }
+ doExecute(RequestEvent.Type.SUCCEED,statHandler->{
+ statHandler.increase(interfaceName, methodName, group, version);
+ });
}
- public void increaseFailedRequests(String interfaceName, String methodName, String group, String version) {
- if (isCollectEnabled()) {
- MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
- AtomicLong count = failedRequests.computeIfAbsent(metric, k -> new AtomicLong(0L));
- count.incrementAndGet();
+ public void increaseFailedRequests(String interfaceName,
+ String methodName,
+ String group,
+ String version) {
+ doExecute(RequestEvent.Type.FAILED,statHandler->{
+ statHandler.increase(interfaceName, methodName, group, version);
+ });
+ }
- publishEvent(new RequestEvent(metric, RequestEvent.Type.FAILED));
- }
+ public void businessFailedRequests(String interfaceName, String methodName, String group, String version) {
+ doExecute(RequestEvent.Type.BUSINESS_FAILED,statHandler->{
+ statHandler.increase(interfaceName, methodName, group, version);
+ });
}
public void increaseProcessingRequests(String interfaceName, String methodName, String group, String version) {
- if (isCollectEnabled()) {
- MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
- AtomicLong count = processingRequests.computeIfAbsent(metric, k -> new AtomicLong(0L));
- count.incrementAndGet();
- }
+ doExecute(RequestEvent.Type.PROCESSING,statHandler-> {
+ statHandler.increase(interfaceName, methodName, group, version);
+ });
}
public void decreaseProcessingRequests(String interfaceName, String methodName, String group, String version) {
- if (isCollectEnabled()) {
- MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
- AtomicLong count = processingRequests.computeIfAbsent(metric, k -> new AtomicLong(0L));
- count.decrementAndGet();
- }
+ doExecute(RequestEvent.Type.PROCESSING,statHandler-> {
+ statHandler.decrease(interfaceName, methodName, group, version);
+ });
}
public void addRT(String interfaceName, String methodName, String group, String version, Long responseTime) {
- if (isCollectEnabled()) {
- MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
-
- AtomicLong last = lastRT.computeIfAbsent(metric, k -> new AtomicLong());
- last.set(responseTime);
-
- LongAccumulator min = minRT.computeIfAbsent(metric, k -> new LongAccumulator(Long::min, Long.MAX_VALUE));
- min.accumulate(responseTime);
-
- LongAccumulator max = maxRT.computeIfAbsent(metric, k -> new LongAccumulator(Long::max, Long.MIN_VALUE));
- max.accumulate(responseTime);
-
- AtomicLong total = totalRT.computeIfAbsent(metric, k -> new AtomicLong());
- total.addAndGet(responseTime);
-
- AtomicLong count = rtCount.computeIfAbsent(metric, k -> new AtomicLong());
- count.incrementAndGet();
-
- avgRT.computeIfAbsent(metric, k -> new AtomicLong());
-
- publishEvent(new RTEvent(metric, responseTime));
- }
- }
-
- private void publishEvent(MetricsEvent event) {
- for (MetricsListener listener : listeners) {
- listener.onEvent(event);
- }
+ stats.addRT(interfaceName, methodName, group, version, responseTime);
}
@Override
@@ -164,24 +121,51 @@
}
private void collectRequests(List<MetricSample> list) {
- totalRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_TOTAL, k.getTags(), REQUESTS, v::get)));
- succeedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_SUCCEED, k.getTags(), REQUESTS, v::get)));
- failedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_FAILED, k.getTags(), REQUESTS, v::get)));
- processingRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_PROCESSING, k.getTags(), REQUESTS, v::get)));
+ doExecute(RequestEvent.Type.TOTAL, MetricsStatHandler::get).filter(e->!e.isEmpty())
+ .ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_TOTAL, k.getTags(), REQUESTS, v::get))));
+
+ doExecute(RequestEvent.Type.SUCCEED, MetricsStatHandler::get).filter(e->!e.isEmpty())
+ .ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_SUCCEED, k.getTags(), REQUESTS, v::get))));
+
+ doExecute(RequestEvent.Type.FAILED, MetricsStatHandler::get).filter(e->!e.isEmpty())
+ .ifPresent(map->{
+ map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_FAILED, k.getTags(), REQUESTS, v::get)));
+ });
+
+ doExecute(RequestEvent.Type.PROCESSING, MetricsStatHandler::get).filter(e->!e.isEmpty())
+ .ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_PROCESSING, k.getTags(), REQUESTS, v::get))));
+
+ doExecute(RequestEvent.Type.BUSINESS_FAILED, MetricsStatHandler::get).filter(e->!e.isEmpty())
+ .ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUEST_BUSINESS_FAILED, k.getTags(), REQUESTS, v::get))));
}
private void collectRT(List<MetricSample> list) {
- lastRT.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_LAST, k.getTags(), RT, v::get)));
- minRT.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_MIN, k.getTags(), RT, v::get)));
- maxRT.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_MAX, k.getTags(), RT, v::get)));
+ this.stats.getLastRT().forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_LAST, k.getTags(), RT, v::get)));
+ this.stats.getMinRT().forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_MIN, k.getTags(), RT, v::get)));
+ this.stats.getMaxRT().forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_MAX, k.getTags(), RT, v::get)));
- totalRT.forEach((k, v) -> {
+ this.stats.getTotalRT().forEach((k, v) -> {
list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_TOTAL, k.getTags(), RT, v::get));
- AtomicLong avg = avgRT.get(k);
- AtomicLong count = rtCount.get(k);
+ AtomicLong avg = this.stats.getAvgRT().get(k);
+ AtomicLong count = this.stats.getRtCount().get(k);
avg.set(v.get() / count.get());
list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_AVG, k.getTags(), RT, avg::get));
});
}
+ private <T> Optional<T> doExecute(RequestEvent.Type requestType, Function<MetricsStatHandler,T> statExecutor) {
+ if (isCollectEnabled()) {
+ MetricsStatHandler handler = stats.getHandler(requestType);
+ T result = statExecutor.apply(handler);
+ return Optional.ofNullable(result);
+ }
+ return Optional.empty();
+ }
+
+ private void doExecute(RequestEvent.Type requestType, Consumer<MetricsStatHandler> statExecutor) {
+ if (isCollectEnabled()) {
+ MetricsStatHandler handler = stats.getHandler(requestType);
+ statExecutor.accept(handler);
+ }
+ }
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/DefaultMetricsStatHandler.java b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/DefaultMetricsStatHandler.java
new file mode 100644
index 0000000..1670b9d
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/DefaultMetricsStatHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.metrics.collector.stat;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.dubbo.common.metrics.model.MethodMetric;
+
+
+public class DefaultMetricsStatHandler implements MetricsStatHandler {
+
+ private final String applicationName;
+ private final Map<MethodMetric, AtomicLong> counts = new ConcurrentHashMap<>();
+
+ public DefaultMetricsStatHandler(String applicationName) {
+ this.applicationName = applicationName;
+ }
+
+ @Override
+ public void increase(String interfaceName, String methodName, String group, String version) {
+ this.doIncrExecute(interfaceName,methodName,group,version);
+ }
+
+ public void decrease(String interfaceName, String methodName, String group, String version){
+ this.doDecrExecute(interfaceName,methodName,group,version);
+ }
+
+ protected void doExecute(String interfaceName, String methodName, String group, String version, BiConsumer<MethodMetric,Map<MethodMetric, AtomicLong>> execute){
+ MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
+ execute.accept(metric,counts);
+
+ this.doNotify(metric);
+ }
+
+ protected void doIncrExecute(String interfaceName, String methodName, String group, String version){
+ this.doExecute(interfaceName,methodName,group,version,(metric,counts)->{
+ AtomicLong count = counts.computeIfAbsent(metric, k -> new AtomicLong(0L));
+ count.incrementAndGet();
+
+ });
+ }
+
+ protected void doDecrExecute(String interfaceName, String methodName, String group, String version){
+ this.doExecute(interfaceName,methodName,group,version,(metric,counts)->{
+ AtomicLong count = counts.computeIfAbsent(metric, k -> new AtomicLong(0L));
+ count.decrementAndGet();
+ });
+ }
+
+ @Override
+ public Map<MethodMetric, AtomicLong> get() {
+ return counts;
+ }
+
+ public void doNotify(MethodMetric metric){}
+
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/MetricsStatComposite.java b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/MetricsStatComposite.java
new file mode 100644
index 0000000..d86a206
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/MetricsStatComposite.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.metrics.collector.stat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAccumulator;
+
+import org.apache.dubbo.common.metrics.collector.DefaultMetricsCollector;
+import org.apache.dubbo.common.metrics.event.MetricsEvent;
+import org.apache.dubbo.common.metrics.event.RTEvent;
+import org.apache.dubbo.common.metrics.event.RequestEvent;
+import org.apache.dubbo.common.metrics.listener.MetricsListener;
+import org.apache.dubbo.common.metrics.model.MethodMetric;
+public class MetricsStatComposite{
+
+ public Map<RequestEvent.Type, MetricsStatHandler> stats = new ConcurrentHashMap<>();
+ private final Map<MethodMetric, AtomicLong> lastRT = new ConcurrentHashMap<>();
+ private final Map<MethodMetric, LongAccumulator> minRT = new ConcurrentHashMap<>();
+ private final Map<MethodMetric, LongAccumulator> maxRT = new ConcurrentHashMap<>();
+ private final Map<MethodMetric, AtomicLong> avgRT = new ConcurrentHashMap<>();
+ private final Map<MethodMetric, AtomicLong> totalRT = new ConcurrentHashMap<>();
+ private final Map<MethodMetric, AtomicLong> rtCount = new ConcurrentHashMap<>();
+ private final String applicationName;
+ private final List<MetricsListener> listeners;
+ private DefaultMetricsCollector collector;
+
+ public MetricsStatComposite(String applicationName, DefaultMetricsCollector collector){
+ this.applicationName = applicationName;
+ this.listeners = collector.getListener();
+ this.collector = collector;
+ this.init();
+ }
+
+ public MetricsStatHandler getHandler(RequestEvent.Type statType) {
+ return stats.get(statType);
+ }
+
+ public Map<MethodMetric, AtomicLong> getLastRT(){
+ return this.lastRT;
+ }
+ public Map<MethodMetric, LongAccumulator> getMinRT(){
+ return this.minRT;
+ }
+
+ public Map<MethodMetric, LongAccumulator> getMaxRT(){
+ return this.maxRT;
+ }
+ public Map<MethodMetric, AtomicLong> getAvgRT(){
+ return this.avgRT;
+ }
+ public Map<MethodMetric, AtomicLong> getTotalRT(){
+ return this.totalRT;
+ }
+ public Map<MethodMetric, AtomicLong> getRtCount(){
+ return this.rtCount;
+ }
+
+ public void addRT(String interfaceName, String methodName, String group, String version, Long responseTime) {
+ if (collector.isCollectEnabled()) {
+ MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
+
+ AtomicLong last = lastRT.computeIfAbsent(metric, k -> new AtomicLong());
+ last.set(responseTime);
+
+ LongAccumulator min = minRT.computeIfAbsent(metric, k -> new LongAccumulator(Long::min, Long.MAX_VALUE));
+ min.accumulate(responseTime);
+
+ LongAccumulator max = maxRT.computeIfAbsent(metric, k -> new LongAccumulator(Long::max, Long.MIN_VALUE));
+ max.accumulate(responseTime);
+
+ AtomicLong total = totalRT.computeIfAbsent(metric, k -> new AtomicLong());
+ total.addAndGet(responseTime);
+
+ AtomicLong count = rtCount.computeIfAbsent(metric, k -> new AtomicLong());
+ count.incrementAndGet();
+
+ avgRT.computeIfAbsent(metric, k -> new AtomicLong());
+
+ publishEvent(new RTEvent(metric, responseTime));
+ }
+ }
+
+ private void init() {
+ stats.put(RequestEvent.Type.TOTAL, new DefaultMetricsStatHandler(applicationName){
+ @Override
+ public void doNotify(MethodMetric metric) {
+ publishEvent(new RequestEvent(metric, RequestEvent.Type.TOTAL));
+ }
+ });
+
+ stats.put(RequestEvent.Type.SUCCEED, new DefaultMetricsStatHandler(applicationName) {
+ @Override
+ public void doNotify(MethodMetric metric) {
+ publishEvent(new RequestEvent(metric, RequestEvent.Type.SUCCEED));
+ }
+ });
+
+ stats.put(RequestEvent.Type.FAILED, new DefaultMetricsStatHandler(applicationName) {
+ @Override
+ public void doNotify(MethodMetric metric) {
+ publishEvent(new RequestEvent(metric, RequestEvent.Type.FAILED));
+ }
+ });
+
+ stats.put(RequestEvent.Type.BUSINESS_FAILED, new DefaultMetricsStatHandler(applicationName) {
+ @Override
+ public void doNotify(MethodMetric metric) {
+ publishEvent(new RequestEvent(metric, RequestEvent.Type.BUSINESS_FAILED));
+ }
+ });
+
+ stats.put(RequestEvent.Type.PROCESSING, new DefaultMetricsStatHandler(applicationName));
+ }
+
+ private void publishEvent(MetricsEvent event) {
+ for (MetricsListener listener : listeners) {
+ listener.onEvent(event);
+ }
+ }
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/MetricsStatHandler.java b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/MetricsStatHandler.java
new file mode 100644
index 0000000..d39c5d4
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/MetricsStatHandler.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.metrics.collector.stat;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.dubbo.common.metrics.model.MethodMetric;
+
+public interface MetricsStatHandler {
+ Map<MethodMetric, AtomicLong> get();
+ void increase(String interfaceName, String methodName, String group, String version);
+ void decrease(String interfaceName, String methodName, String group, String version);
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/event/RequestEvent.java b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/event/RequestEvent.java
index b1d050b..f0a6677 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/event/RequestEvent.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/event/RequestEvent.java
@@ -40,6 +40,8 @@
TOTAL,
SUCCEED,
FAILED,
+ BUSINESS_FAILED,
+
PROCESSING
}
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/model/MetricsKey.java b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/model/MetricsKey.java
index 07816a3..bbb1bd8 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/model/MetricsKey.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/model/MetricsKey.java
@@ -22,10 +22,14 @@
METRIC_REQUESTS_TOTAL("requests.total", "Total Requests"),
METRIC_REQUESTS_SUCCEED("requests.succeed", "Succeed Requests"),
METRIC_REQUESTS_FAILED("requests.failed", "Failed Requests"),
+ METRIC_REQUEST_BUSINESS_FAILED("requests.business.failed","Failed Business Requests"),
METRIC_REQUESTS_PROCESSING("requests.processing", "Processing Requests"),
+
METRIC_REQUESTS_TOTAL_AGG("requests.total.aggregate", "Aggregated Total Requests"),
METRIC_REQUESTS_SUCCEED_AGG("requests.succeed.aggregate", "Aggregated Succeed Requests"),
METRIC_REQUESTS_FAILED_AGG("requests.failed.aggregate", "Aggregated Failed Requests"),
+ METRIC_REQUESTS_BUSINESS_FAILED_AGG("requests.business.failed.aggregate", "Aggregated Business Failed Requests"),
+
METRIC_QPS("qps", "Query Per Seconds"),
METRIC_RT_LAST("rt.last", "Last Response Time"),
METRIC_RT_MIN("rt.min", "Min Response Time"),
diff --git a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
index b473d1f..302c851 100644
--- a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
+++ b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
@@ -54,6 +54,7 @@
private final Map<MethodMetric, TimeWindowCounter> totalRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowCounter> succeedRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowCounter> failedRequests = new ConcurrentHashMap<>();
+ private final Map<MethodMetric, TimeWindowCounter> businessFailedRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowCounter> qps = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowQuantile> rt = new ConcurrentHashMap<>();
@@ -113,6 +114,10 @@
case FAILED:
counter = failedRequests.computeIfAbsent(metric, k -> new TimeWindowCounter(bucketNum, timeWindowSeconds));
break;
+ case BUSINESS_FAILED:
+ counter = businessFailedRequests.computeIfAbsent(metric, k -> new TimeWindowCounter(bucketNum, timeWindowSeconds));
+ break;
+
default:
break;
}
@@ -136,6 +141,7 @@
totalRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_TOTAL_AGG, k.getTags(), REQUESTS, v::get)));
succeedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_SUCCEED_AGG, k.getTags(), REQUESTS, v::get)));
failedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_FAILED_AGG, k.getTags(), REQUESTS, v::get)));
+ businessFailedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_BUSINESS_FAILED_AGG, k.getTags(), REQUESTS, v::get)));
}
private void collectQPS(List<MetricSample> list) {
diff --git a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsCollectExecutor.java b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsCollectExecutor.java
new file mode 100644
index 0000000..0ae3fdf
--- /dev/null
+++ b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsCollectExecutor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.filter;
+
+import static org.apache.dubbo.common.constants.MetricsConstants.METRIC_FILTER_START_TIME;
+
+import java.util.function.Supplier;
+
+import org.apache.dubbo.common.metrics.collector.DefaultMetricsCollector;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+
+public class MetricsCollectExecutor {
+
+ private final DefaultMetricsCollector collector;
+ private final Invocation invocation;
+ private String interfaceName;
+ private String methodName;
+ private String group;
+ private String version;
+
+
+ public MetricsCollectExecutor(DefaultMetricsCollector collector, Invocation invocation) {
+ init(invocation);
+
+ this.collector = collector;
+
+ this.invocation = invocation;
+ }
+
+ public void beforeExecute() {
+ collector.increaseTotalRequests(interfaceName, methodName, group, version);
+ collector.increaseProcessingRequests(interfaceName, methodName, group, version);
+ invocation.put(METRIC_FILTER_START_TIME, System.currentTimeMillis());
+ }
+
+ public void postExecute(Result result) {
+ if (result.hasException()) {
+ this.throwExecute(result.getException());
+ return;
+ }
+ collector.increaseSucceedRequests(interfaceName, methodName, group, version);
+ endExecute();
+ }
+
+ public void throwExecute(Throwable throwable){
+ if (throwable instanceof RpcException) {
+ RpcException rpcException = (RpcException)throwable;
+ if (rpcException.isBiz()) {
+ collector.businessFailedRequests(interfaceName, methodName, group, version);
+ }else{
+ collector.increaseFailedRequests(interfaceName, methodName, group, version);
+ }
+ }
+ endExecute(()-> throwable instanceof RpcException && ((RpcException) throwable).isBiz());
+ }
+
+ private void endExecute(){
+ this.endExecute(() -> true);
+ }
+
+ private void endExecute(Supplier<Boolean> rtStat){
+ if (rtStat.get()) {
+ Long endTime = System.currentTimeMillis();
+ Long beginTime = (Long) invocation.get(METRIC_FILTER_START_TIME);
+ Long rt = endTime - beginTime;
+ collector.addRT(interfaceName, methodName, group, version, rt);
+ }
+ collector.decreaseProcessingRequests(interfaceName, methodName, group, version);
+ }
+
+ private void init(Invocation invocation) {
+ String serviceUniqueName = invocation.getTargetServiceUniqueName();
+ String methodName = invocation.getMethodName();
+ String group = null;
+ String interfaceAndVersion;
+ String[] arr = serviceUniqueName.split("/");
+ if (arr.length == 2) {
+ group = arr[0];
+ interfaceAndVersion = arr[1];
+ } else {
+ interfaceAndVersion = arr[0];
+ }
+
+ String[] ivArr = interfaceAndVersion.split(":");
+ String interfaceName = ivArr[0];
+ String version = ivArr.length == 2 ? ivArr[1] : null;
+
+ this.interfaceName = interfaceName;
+ this.methodName = methodName;
+ this.group = group;
+ this.version = version;
+ }
+}
diff --git a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsFilter.java b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsFilter.java
index 031f8ef..4e5cc13 100644
--- a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsFilter.java
+++ b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsFilter.java
@@ -14,11 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.dubbo.metrics.filter;
+import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER;
+
+import java.util.function.Consumer;
+
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.metrics.collector.DefaultMetricsCollector;
+import org.apache.dubbo.rpc.BaseFilter;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
@@ -27,19 +31,17 @@
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ScopeModelAware;
-import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER;
-
@Activate(group = PROVIDER, order = -1)
-public class MetricsFilter implements Filter, ScopeModelAware {
+public class MetricsFilter implements Filter, BaseFilter.Listener, ScopeModelAware {
private DefaultMetricsCollector collector = null;
private ApplicationModel applicationModel;
+
@Override
public void setApplicationModel(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
-
collector = applicationModel.getBeanFactory().getBean(DefaultMetricsCollector.class);
}
@@ -48,38 +50,23 @@
if (collector == null || !collector.isCollectEnabled()) {
return invoker.invoke(invocation);
}
+ collect(invocation, MetricsCollectExecutor::beforeExecute);
- String serviceUniqueName = invocation.getTargetServiceUniqueName();
- String methodName = invocation.getMethodName();
- String group = null;
- String interfaceAndVersion;
- String[] arr = serviceUniqueName.split("/");
- if (arr.length == 2) {
- group = arr[0];
- interfaceAndVersion = arr[1];
- } else {
- interfaceAndVersion = arr[0];
- }
+ return invoker.invoke(invocation);
+ }
- String[] ivArr = interfaceAndVersion.split(":");
- String interfaceName = ivArr[0];
- String version = ivArr.length == 2 ? ivArr[1] : null;
- collector.increaseTotalRequests(interfaceName, methodName, group, version);
- collector.increaseProcessingRequests(interfaceName, methodName, group, version);
+ @Override
+ public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
+ collect(invocation, collector->collector.postExecute(result));
+ }
- Long startTime = System.currentTimeMillis();
- try {
- Result invoke = invoker.invoke(invocation);
- collector.increaseSucceedRequests(interfaceName, methodName, group, version);
- return invoke;
- } catch (RpcException e) {
- collector.increaseFailedRequests(interfaceName, methodName, group, version);
- throw e;
- } finally {
- Long endTime = System.currentTimeMillis();
- Long rt = endTime - startTime;
- collector.addRT(interfaceName, methodName, group, version, rt);
- collector.decreaseProcessingRequests(interfaceName, methodName, group, version);
- }
+ @Override
+ public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+ collect(invocation,collector-> collector.throwExecute(t));
+ }
+
+ private void collect(Invocation invocation, Consumer<MetricsCollectExecutor> execute) {
+ MetricsCollectExecutor collectorExecutor = new MetricsCollectExecutor(collector, invocation);
+ execute.accept(collectorExecutor);
}
}
diff --git a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
index 308e238..0b3317a 100644
--- a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
+++ b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
@@ -18,6 +18,7 @@
package org.apache.dubbo.metrics.collector;
import org.apache.dubbo.common.metrics.collector.DefaultMetricsCollector;
+import org.apache.dubbo.common.metrics.model.MetricsKey;
import org.apache.dubbo.common.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.common.metrics.model.sample.MetricSample;
import org.apache.dubbo.config.ApplicationConfig;
@@ -81,6 +82,7 @@
defaultCollector.increaseTotalRequests(interfaceName, methodName, group, version);
defaultCollector.increaseSucceedRequests(interfaceName, methodName, group, version);
defaultCollector.increaseFailedRequests(interfaceName, methodName, group, version);
+ defaultCollector.businessFailedRequests(interfaceName,methodName,group,version);
List<MetricSample> samples = collector.collect();
for (MetricSample sample : samples) {
@@ -101,6 +103,8 @@
Assertions.assertEquals(sampleMap.get("requests.total.aggregate"), 1L);
Assertions.assertEquals(sampleMap.get("requests.succeed.aggregate"), 1L);
Assertions.assertEquals(sampleMap.get("requests.failed.aggregate"), 1L);
+ Assertions.assertEquals(sampleMap.get(MetricsKey.METRIC_REQUESTS_BUSINESS_FAILED_AGG.getName()), 1L);
+
Assertions.assertTrue(sampleMap.containsKey("qps"));
}
diff --git a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/filter/MetricsFilterTest.java b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/filter/MetricsFilterTest.java
index 2e86a83..dbe6806 100644
--- a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/filter/MetricsFilterTest.java
+++ b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/filter/MetricsFilterTest.java
@@ -17,11 +17,25 @@
package org.apache.dubbo.metrics.filter;
+import static org.apache.dubbo.common.constants.MetricsConstants.TAG_GROUP_KEY;
+import static org.apache.dubbo.common.constants.MetricsConstants.TAG_INTERFACE_KEY;
+import static org.apache.dubbo.common.constants.MetricsConstants.TAG_METHOD_KEY;
+import static org.apache.dubbo.common.constants.MetricsConstants.TAG_VERSION_KEY;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
import org.apache.dubbo.common.metrics.collector.DefaultMetricsCollector;
+import org.apache.dubbo.common.metrics.model.MetricsKey;
import org.apache.dubbo.common.metrics.model.sample.MetricSample;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -30,15 +44,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static org.apache.dubbo.common.constants.MetricsConstants.*;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.mock;
-
public class MetricsFilterTest {
private ApplicationModel applicationModel;
@@ -89,8 +94,9 @@
try {
filter.invoke(invoker, invocation);
- } catch (Exception ignore) {
-
+ } catch (Exception e) {
+ Assertions.assertTrue(e instanceof RpcException);
+ filter.onError(e, invoker, invocation);
}
Map<String, MetricSample> metricsMap = getMetricsMap();
@@ -106,13 +112,45 @@
Assertions.assertEquals(tags.get(TAG_VERSION_KEY), VERSION);
}
+
+ @Test
+ public void testBusinessFailedRequests() {
+ collector.setCollectEnabled(true);
+
+ given(invoker.invoke(invocation)).willThrow(new RpcException(RpcException.BIZ_EXCEPTION));
+ initParam();
+
+ try {
+ filter.invoke(invoker, invocation);
+ } catch (Exception e) {
+ Assertions.assertTrue(e instanceof RpcException);
+ filter.onError(e, invoker, invocation);
+ }
+
+ Map<String, MetricSample> metricsMap = getMetricsMap();
+ Assertions.assertTrue(metricsMap.containsKey(MetricsKey.METRIC_REQUEST_BUSINESS_FAILED.getName()));
+ Assertions.assertFalse(metricsMap.containsKey("requests.succeed"));
+
+ MetricSample sample = metricsMap.get(MetricsKey.METRIC_REQUEST_BUSINESS_FAILED.getName());
+
+ Map<String, String> tags = sample.getTags();
+
+ Assertions.assertEquals(tags.get(TAG_INTERFACE_KEY), INTERFACE_NAME);
+ Assertions.assertEquals(tags.get(TAG_METHOD_KEY), METHOD_NAME);
+ Assertions.assertEquals(tags.get(TAG_GROUP_KEY), GROUP);
+ Assertions.assertEquals(tags.get(TAG_VERSION_KEY), VERSION);
+ }
+
@Test
public void testSucceedRequests() {
collector.setCollectEnabled(true);
given(invoker.invoke(invocation)).willReturn(new AppResponse("success"));
initParam();
- filter.invoke(invoker, invocation);
+ Result result = filter.invoke(invoker, invocation);
+
+ filter.onResponse(result, invoker, invocation);
+
Map<String, MetricSample> metricsMap = getMetricsMap();
Assertions.assertFalse(metricsMap.containsKey("requests.failed"));
Assertions.assertTrue(metricsMap.containsKey("requests.succeed"));
@@ -134,7 +172,10 @@
invocation.setMethodName(METHOD_NAME);
invocation.setParameterTypes(new Class[]{String.class});
- filter.invoke(invoker, invocation);
+ Result result = filter.invoke(invoker, invocation);
+
+ filter.onResponse(result, invoker, invocation);
+
Map<String, MetricSample> metricsMap = getMetricsMap();
MetricSample sample = metricsMap.get("requests.succeed");
@@ -154,7 +195,10 @@
invocation.setMethodName(METHOD_NAME);
invocation.setParameterTypes(new Class[]{String.class});
- filter.invoke(invoker, invocation);
+ Result result = filter.invoke(invoker, invocation);
+
+ filter.onResponse(result, invoker, invocation);
+
Map<String, MetricSample> metricsMap = getMetricsMap();
MetricSample sample = metricsMap.get("requests.succeed");
@@ -174,7 +218,10 @@
invocation.setMethodName(METHOD_NAME);
invocation.setParameterTypes(new Class[]{String.class});
- filter.invoke(invoker, invocation);
+ Result result = filter.invoke(invoker, invocation);
+
+ filter.onResponse(result, invoker, invocation);
+
Map<String, MetricSample> metricsMap = getMetricsMap();
MetricSample sample = metricsMap.get("requests.succeed");