MetricsFilter implements BaseFilter.Listener (#10589)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/MetricsConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/MetricsConstants.java
index 74a4fe4..04bb400 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/MetricsConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/MetricsConstants.java
@@ -73,4 +73,6 @@
     int PROMETHEUS_DEFAULT_PUSH_INTERVAL = 30;
 
     String PROMETHEUS_DEFAULT_JOB_NAME = "default_dubbo_job";
+
+    String METRIC_FILTER_START_TIME = "metric_filter_start_time";
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/DefaultMetricsCollector.java b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/DefaultMetricsCollector.java
index 599b6c8..6e5ba2d 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/DefaultMetricsCollector.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/DefaultMetricsCollector.java
@@ -17,27 +17,26 @@
 
 package org.apache.dubbo.common.metrics.collector;
 
-import org.apache.dubbo.common.metrics.event.MetricsEvent;
-import org.apache.dubbo.common.metrics.event.RTEvent;
+import static org.apache.dubbo.common.metrics.model.MetricsCategory.REQUESTS;
+import static org.apache.dubbo.common.metrics.model.MetricsCategory.RT;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.dubbo.common.metrics.collector.stat.MetricsStatComposite;
+import org.apache.dubbo.common.metrics.collector.stat.MetricsStatHandler;
 import org.apache.dubbo.common.metrics.event.RequestEvent;
 import org.apache.dubbo.common.metrics.listener.MetricsListener;
-import org.apache.dubbo.common.metrics.model.MethodMetric;
 import org.apache.dubbo.common.metrics.model.MetricsKey;
 import org.apache.dubbo.common.metrics.model.sample.GaugeMetricSample;
 import org.apache.dubbo.common.metrics.model.sample.MetricSample;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAccumulator;
-
-import static org.apache.dubbo.common.metrics.model.MetricsCategory.REQUESTS;
-import static org.apache.dubbo.common.metrics.model.MetricsCategory.RT;
-
 /**
  * Default implementation of {@link MetricsCollector}
  */
@@ -46,23 +45,11 @@
     private AtomicBoolean collectEnabled = new AtomicBoolean(false);
     private final List<MetricsListener> listeners = new ArrayList<>();
     private final ApplicationModel applicationModel;
-    private final String applicationName;
-
-    private final Map<MethodMetric, AtomicLong> totalRequests = new ConcurrentHashMap<>();
-    private final Map<MethodMetric, AtomicLong> succeedRequests = new ConcurrentHashMap<>();
-    private final Map<MethodMetric, AtomicLong> failedRequests = new ConcurrentHashMap<>();
-    private final Map<MethodMetric, AtomicLong> processingRequests = new ConcurrentHashMap<>();
-
-    private final Map<MethodMetric, AtomicLong> lastRT = new ConcurrentHashMap<>();
-    private final Map<MethodMetric, LongAccumulator> minRT = new ConcurrentHashMap<>();
-    private final Map<MethodMetric, LongAccumulator> maxRT = new ConcurrentHashMap<>();
-    private final Map<MethodMetric, AtomicLong> avgRT = new ConcurrentHashMap<>();
-    private final Map<MethodMetric, AtomicLong> totalRT = new ConcurrentHashMap<>();
-    private final Map<MethodMetric, AtomicLong> rtCount = new ConcurrentHashMap<>();
+    private final MetricsStatComposite stats;
 
     public DefaultMetricsCollector(ApplicationModel applicationModel) {
         this.applicationModel = applicationModel;
-        this.applicationName = applicationModel.getApplicationName();
+        this.stats = new MetricsStatComposite(applicationModel.getApplicationName(), this);
     }
 
     public void setCollectEnabled(Boolean collectEnabled) {
@@ -77,81 +64,51 @@
         listeners.add(listener);
     }
 
-    public void increaseTotalRequests(String interfaceName, String methodName, String group, String version) {
-        if (isCollectEnabled()) {
-            MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
-            AtomicLong count = totalRequests.computeIfAbsent(metric, k -> new AtomicLong(0L));
-            count.incrementAndGet();
+    public List<MetricsListener> getListener() {
+        return this.listeners;
+    }
 
-            publishEvent(new RequestEvent(metric, RequestEvent.Type.TOTAL));
-        }
+    public void increaseTotalRequests(String interfaceName, String methodName, String group, String version) {
+        doExecute(RequestEvent.Type.TOTAL,statHandler-> {
+            statHandler.increase(interfaceName, methodName, group, version);
+        });
     }
 
     public void increaseSucceedRequests(String interfaceName, String methodName, String group, String version) {
-        if (isCollectEnabled()) {
-            MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
-            AtomicLong count = succeedRequests.computeIfAbsent(metric, k -> new AtomicLong(0L));
-            count.incrementAndGet();
-
-            publishEvent(new RequestEvent(metric, RequestEvent.Type.SUCCEED));
-        }
+        doExecute(RequestEvent.Type.SUCCEED,statHandler->{
+            statHandler.increase(interfaceName, methodName, group, version);
+        });
     }
 
-    public void increaseFailedRequests(String interfaceName, String methodName, String group, String version) {
-        if (isCollectEnabled()) {
-            MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
-            AtomicLong count = failedRequests.computeIfAbsent(metric, k -> new AtomicLong(0L));
-            count.incrementAndGet();
+    public void increaseFailedRequests(String interfaceName,
+                                       String methodName,
+                                       String group,
+                                       String version) {
+        doExecute(RequestEvent.Type.FAILED,statHandler->{
+            statHandler.increase(interfaceName, methodName, group, version);
+        });
+    }
 
-            publishEvent(new RequestEvent(metric, RequestEvent.Type.FAILED));
-        }
+    public void businessFailedRequests(String interfaceName, String methodName, String group, String version) {
+        doExecute(RequestEvent.Type.BUSINESS_FAILED,statHandler->{
+            statHandler.increase(interfaceName, methodName, group, version);
+        });
     }
 
     public void increaseProcessingRequests(String interfaceName, String methodName, String group, String version) {
-        if (isCollectEnabled()) {
-            MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
-            AtomicLong count = processingRequests.computeIfAbsent(metric, k -> new AtomicLong(0L));
-            count.incrementAndGet();
-        }
+        doExecute(RequestEvent.Type.PROCESSING,statHandler-> {
+            statHandler.increase(interfaceName, methodName, group, version);
+        });
     }
 
     public void decreaseProcessingRequests(String interfaceName, String methodName, String group, String version) {
-        if (isCollectEnabled()) {
-            MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
-            AtomicLong count = processingRequests.computeIfAbsent(metric, k -> new AtomicLong(0L));
-            count.decrementAndGet();
-        }
+        doExecute(RequestEvent.Type.PROCESSING,statHandler-> {
+            statHandler.decrease(interfaceName, methodName, group, version);
+        });
     }
 
     public void addRT(String interfaceName, String methodName, String group, String version, Long responseTime) {
-        if (isCollectEnabled()) {
-            MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
-
-            AtomicLong last = lastRT.computeIfAbsent(metric, k -> new AtomicLong());
-            last.set(responseTime);
-
-            LongAccumulator min = minRT.computeIfAbsent(metric, k -> new LongAccumulator(Long::min, Long.MAX_VALUE));
-            min.accumulate(responseTime);
-
-            LongAccumulator max = maxRT.computeIfAbsent(metric, k -> new LongAccumulator(Long::max, Long.MIN_VALUE));
-            max.accumulate(responseTime);
-
-            AtomicLong total = totalRT.computeIfAbsent(metric, k -> new AtomicLong());
-            total.addAndGet(responseTime);
-
-            AtomicLong count = rtCount.computeIfAbsent(metric, k -> new AtomicLong());
-            count.incrementAndGet();
-
-            avgRT.computeIfAbsent(metric, k -> new AtomicLong());
-
-            publishEvent(new RTEvent(metric, responseTime));
-        }
-    }
-
-    private void publishEvent(MetricsEvent event) {
-        for (MetricsListener listener : listeners) {
-            listener.onEvent(event);
-        }
+        stats.addRT(interfaceName, methodName, group, version, responseTime);
     }
 
     @Override
@@ -164,24 +121,51 @@
     }
 
     private void collectRequests(List<MetricSample> list) {
-        totalRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_TOTAL, k.getTags(), REQUESTS, v::get)));
-        succeedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_SUCCEED, k.getTags(), REQUESTS, v::get)));
-        failedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_FAILED, k.getTags(), REQUESTS, v::get)));
-        processingRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_PROCESSING, k.getTags(), REQUESTS, v::get)));
+        doExecute(RequestEvent.Type.TOTAL, MetricsStatHandler::get).filter(e->!e.isEmpty())
+            .ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_TOTAL, k.getTags(), REQUESTS, v::get))));
+
+        doExecute(RequestEvent.Type.SUCCEED, MetricsStatHandler::get).filter(e->!e.isEmpty())
+            .ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_SUCCEED, k.getTags(), REQUESTS, v::get))));
+
+        doExecute(RequestEvent.Type.FAILED, MetricsStatHandler::get).filter(e->!e.isEmpty())
+            .ifPresent(map->{
+                map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_FAILED, k.getTags(), REQUESTS, v::get)));
+            });
+
+        doExecute(RequestEvent.Type.PROCESSING, MetricsStatHandler::get).filter(e->!e.isEmpty())
+            .ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_PROCESSING, k.getTags(), REQUESTS, v::get))));
+
+        doExecute(RequestEvent.Type.BUSINESS_FAILED, MetricsStatHandler::get).filter(e->!e.isEmpty())
+            .ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUEST_BUSINESS_FAILED, k.getTags(), REQUESTS, v::get))));
     }
 
     private void collectRT(List<MetricSample> list) {
-        lastRT.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_LAST, k.getTags(), RT, v::get)));
-        minRT.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_MIN, k.getTags(), RT, v::get)));
-        maxRT.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_MAX, k.getTags(), RT, v::get)));
+        this.stats.getLastRT().forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_LAST, k.getTags(), RT, v::get)));
+        this.stats.getMinRT().forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_MIN, k.getTags(), RT, v::get)));
+        this.stats.getMaxRT().forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_MAX, k.getTags(), RT, v::get)));
 
-        totalRT.forEach((k, v) -> {
+        this.stats.getTotalRT().forEach((k, v) -> {
             list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_TOTAL, k.getTags(), RT, v::get));
 
-            AtomicLong avg = avgRT.get(k);
-            AtomicLong count = rtCount.get(k);
+            AtomicLong avg = this.stats.getAvgRT().get(k);
+            AtomicLong count = this.stats.getRtCount().get(k);
             avg.set(v.get() / count.get());
             list.add(new GaugeMetricSample(MetricsKey.METRIC_RT_AVG, k.getTags(), RT, avg::get));
         });
     }
+    private <T> Optional<T> doExecute(RequestEvent.Type requestType, Function<MetricsStatHandler,T> statExecutor) {
+        if (isCollectEnabled()) {
+            MetricsStatHandler handler = stats.getHandler(requestType);
+            T result =  statExecutor.apply(handler);
+            return Optional.ofNullable(result);
+        }
+        return Optional.empty();
+    }
+
+    private void doExecute(RequestEvent.Type requestType, Consumer<MetricsStatHandler> statExecutor) {
+        if (isCollectEnabled()) {
+            MetricsStatHandler handler = stats.getHandler(requestType);
+             statExecutor.accept(handler);
+        }
+    }
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/DefaultMetricsStatHandler.java b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/DefaultMetricsStatHandler.java
new file mode 100644
index 0000000..1670b9d
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/DefaultMetricsStatHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.metrics.collector.stat;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.dubbo.common.metrics.model.MethodMetric;
+
+
+public class DefaultMetricsStatHandler implements MetricsStatHandler {
+
+    private final String applicationName;
+    private final Map<MethodMetric, AtomicLong> counts = new ConcurrentHashMap<>();
+
+    public DefaultMetricsStatHandler(String applicationName) {
+        this.applicationName = applicationName;
+    }
+
+    @Override
+    public void increase(String interfaceName, String methodName, String group, String version) {
+        this.doIncrExecute(interfaceName,methodName,group,version);
+    }
+
+    public void decrease(String interfaceName, String methodName, String group, String version){
+        this.doDecrExecute(interfaceName,methodName,group,version);
+    }
+
+    protected void doExecute(String interfaceName, String methodName, String group, String version, BiConsumer<MethodMetric,Map<MethodMetric, AtomicLong>> execute){
+        MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
+        execute.accept(metric,counts);
+
+        this.doNotify(metric);
+    }
+
+    protected void doIncrExecute(String interfaceName, String methodName, String group, String version){
+        this.doExecute(interfaceName,methodName,group,version,(metric,counts)->{
+            AtomicLong count = counts.computeIfAbsent(metric, k -> new AtomicLong(0L));
+            count.incrementAndGet();
+
+        });
+    }
+
+    protected void doDecrExecute(String interfaceName, String methodName, String group, String version){
+        this.doExecute(interfaceName,methodName,group,version,(metric,counts)->{
+            AtomicLong count = counts.computeIfAbsent(metric, k -> new AtomicLong(0L));
+            count.decrementAndGet();
+        });
+    }
+
+    @Override
+    public Map<MethodMetric, AtomicLong> get() {
+        return counts;
+    }
+
+    public  void doNotify(MethodMetric metric){}
+
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/MetricsStatComposite.java b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/MetricsStatComposite.java
new file mode 100644
index 0000000..d86a206
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/MetricsStatComposite.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.metrics.collector.stat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAccumulator;
+
+import org.apache.dubbo.common.metrics.collector.DefaultMetricsCollector;
+import org.apache.dubbo.common.metrics.event.MetricsEvent;
+import org.apache.dubbo.common.metrics.event.RTEvent;
+import org.apache.dubbo.common.metrics.event.RequestEvent;
+import org.apache.dubbo.common.metrics.listener.MetricsListener;
+import org.apache.dubbo.common.metrics.model.MethodMetric;
+public class MetricsStatComposite{
+
+    public Map<RequestEvent.Type, MetricsStatHandler> stats = new ConcurrentHashMap<>();
+    private final Map<MethodMetric, AtomicLong>     lastRT = new ConcurrentHashMap<>();
+    private final Map<MethodMetric, LongAccumulator> minRT  = new ConcurrentHashMap<>();
+    private final Map<MethodMetric, LongAccumulator> maxRT  = new ConcurrentHashMap<>();
+    private final Map<MethodMetric, AtomicLong> avgRT = new ConcurrentHashMap<>();
+    private final Map<MethodMetric, AtomicLong> totalRT = new ConcurrentHashMap<>();
+    private final Map<MethodMetric, AtomicLong> rtCount = new ConcurrentHashMap<>();
+    private final String applicationName;
+    private final List<MetricsListener> listeners;
+    private DefaultMetricsCollector collector;
+
+    public MetricsStatComposite(String applicationName, DefaultMetricsCollector collector){
+        this.applicationName = applicationName;
+        this.listeners = collector.getListener();
+        this.collector = collector;
+        this.init();
+    }
+
+    public MetricsStatHandler getHandler(RequestEvent.Type statType) {
+        return stats.get(statType);
+    }
+
+    public Map<MethodMetric, AtomicLong> getLastRT(){
+        return this.lastRT;
+    }
+    public Map<MethodMetric, LongAccumulator> getMinRT(){
+        return this.minRT;
+    }
+
+    public Map<MethodMetric, LongAccumulator> getMaxRT(){
+        return this.maxRT;
+    }
+    public Map<MethodMetric, AtomicLong> getAvgRT(){
+        return this.avgRT;
+    }
+    public Map<MethodMetric, AtomicLong> getTotalRT(){
+        return this.totalRT;
+    }
+    public Map<MethodMetric, AtomicLong> getRtCount(){
+        return this.rtCount;
+    }
+
+    public void addRT(String interfaceName, String methodName, String group, String version, Long responseTime) {
+        if (collector.isCollectEnabled()) {
+            MethodMetric metric = new MethodMetric(applicationName, interfaceName, methodName, group, version);
+
+            AtomicLong last = lastRT.computeIfAbsent(metric, k -> new AtomicLong());
+            last.set(responseTime);
+
+            LongAccumulator min = minRT.computeIfAbsent(metric, k -> new LongAccumulator(Long::min, Long.MAX_VALUE));
+            min.accumulate(responseTime);
+
+            LongAccumulator max = maxRT.computeIfAbsent(metric, k -> new LongAccumulator(Long::max, Long.MIN_VALUE));
+            max.accumulate(responseTime);
+
+            AtomicLong total = totalRT.computeIfAbsent(metric, k -> new AtomicLong());
+            total.addAndGet(responseTime);
+
+            AtomicLong count = rtCount.computeIfAbsent(metric, k -> new AtomicLong());
+            count.incrementAndGet();
+
+            avgRT.computeIfAbsent(metric, k -> new AtomicLong());
+
+            publishEvent(new RTEvent(metric, responseTime));
+        }
+    }
+
+    private void init() {
+        stats.put(RequestEvent.Type.TOTAL, new DefaultMetricsStatHandler(applicationName){
+            @Override
+            public void doNotify(MethodMetric metric) {
+                publishEvent(new RequestEvent(metric, RequestEvent.Type.TOTAL));
+            }
+        });
+
+        stats.put(RequestEvent.Type.SUCCEED, new DefaultMetricsStatHandler(applicationName) {
+            @Override
+            public void doNotify(MethodMetric metric) {
+                publishEvent(new RequestEvent(metric, RequestEvent.Type.SUCCEED));
+            }
+        });
+
+        stats.put(RequestEvent.Type.FAILED, new DefaultMetricsStatHandler(applicationName) {
+            @Override
+            public void doNotify(MethodMetric metric) {
+                publishEvent(new RequestEvent(metric, RequestEvent.Type.FAILED));
+            }
+        });
+
+        stats.put(RequestEvent.Type.BUSINESS_FAILED, new DefaultMetricsStatHandler(applicationName) {
+            @Override
+            public void doNotify(MethodMetric metric) {
+                publishEvent(new RequestEvent(metric, RequestEvent.Type.BUSINESS_FAILED));
+            }
+        });
+
+        stats.put(RequestEvent.Type.PROCESSING, new DefaultMetricsStatHandler(applicationName));
+    }
+
+    private void publishEvent(MetricsEvent event) {
+        for (MetricsListener listener : listeners) {
+            listener.onEvent(event);
+        }
+    }
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/MetricsStatHandler.java b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/MetricsStatHandler.java
new file mode 100644
index 0000000..d39c5d4
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/collector/stat/MetricsStatHandler.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.metrics.collector.stat;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.dubbo.common.metrics.model.MethodMetric;
+
+public interface MetricsStatHandler {
+    Map<MethodMetric, AtomicLong> get();
+    void increase(String interfaceName, String methodName, String group, String version);
+    void decrease(String interfaceName, String methodName, String group, String version);
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/event/RequestEvent.java b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/event/RequestEvent.java
index b1d050b..f0a6677 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/event/RequestEvent.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/event/RequestEvent.java
@@ -40,6 +40,8 @@
         TOTAL,
         SUCCEED,
         FAILED,
+        BUSINESS_FAILED,
+
         PROCESSING
     }
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/model/MetricsKey.java b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/model/MetricsKey.java
index 07816a3..bbb1bd8 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/model/MetricsKey.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/metrics/model/MetricsKey.java
@@ -22,10 +22,14 @@
     METRIC_REQUESTS_TOTAL("requests.total", "Total Requests"),
     METRIC_REQUESTS_SUCCEED("requests.succeed", "Succeed Requests"),
     METRIC_REQUESTS_FAILED("requests.failed", "Failed Requests"),
+    METRIC_REQUEST_BUSINESS_FAILED("requests.business.failed","Failed Business Requests"),
     METRIC_REQUESTS_PROCESSING("requests.processing", "Processing Requests"),
+
     METRIC_REQUESTS_TOTAL_AGG("requests.total.aggregate", "Aggregated Total Requests"),
     METRIC_REQUESTS_SUCCEED_AGG("requests.succeed.aggregate", "Aggregated Succeed Requests"),
     METRIC_REQUESTS_FAILED_AGG("requests.failed.aggregate", "Aggregated Failed Requests"),
+    METRIC_REQUESTS_BUSINESS_FAILED_AGG("requests.business.failed.aggregate", "Aggregated Business Failed Requests"),
+
     METRIC_QPS("qps", "Query Per Seconds"),
     METRIC_RT_LAST("rt.last", "Last Response Time"),
     METRIC_RT_MIN("rt.min", "Min Response Time"),
diff --git a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
index b473d1f..302c851 100644
--- a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
+++ b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
@@ -54,6 +54,7 @@
     private final Map<MethodMetric, TimeWindowCounter> totalRequests = new ConcurrentHashMap<>();
     private final Map<MethodMetric, TimeWindowCounter> succeedRequests = new ConcurrentHashMap<>();
     private final Map<MethodMetric, TimeWindowCounter> failedRequests = new ConcurrentHashMap<>();
+    private final Map<MethodMetric, TimeWindowCounter> businessFailedRequests = new ConcurrentHashMap<>();
     private final Map<MethodMetric, TimeWindowCounter> qps = new ConcurrentHashMap<>();
     private final Map<MethodMetric, TimeWindowQuantile> rt = new ConcurrentHashMap<>();
 
@@ -113,6 +114,10 @@
             case FAILED:
                 counter = failedRequests.computeIfAbsent(metric, k -> new TimeWindowCounter(bucketNum, timeWindowSeconds));
                 break;
+            case BUSINESS_FAILED:
+                counter = businessFailedRequests.computeIfAbsent(metric, k -> new TimeWindowCounter(bucketNum, timeWindowSeconds));
+                break;
+
             default:
                 break;
         }
@@ -136,6 +141,7 @@
         totalRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_TOTAL_AGG, k.getTags(), REQUESTS, v::get)));
         succeedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_SUCCEED_AGG, k.getTags(), REQUESTS, v::get)));
         failedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_FAILED_AGG, k.getTags(), REQUESTS, v::get)));
+        businessFailedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_BUSINESS_FAILED_AGG, k.getTags(), REQUESTS, v::get)));
     }
 
     private void collectQPS(List<MetricSample> list) {
diff --git a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsCollectExecutor.java b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsCollectExecutor.java
new file mode 100644
index 0000000..0ae3fdf
--- /dev/null
+++ b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsCollectExecutor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.filter;
+
+import static org.apache.dubbo.common.constants.MetricsConstants.METRIC_FILTER_START_TIME;
+
+import java.util.function.Supplier;
+
+import org.apache.dubbo.common.metrics.collector.DefaultMetricsCollector;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+
+public class MetricsCollectExecutor {
+
+    private final DefaultMetricsCollector collector;
+    private final Invocation    invocation;
+    private String  interfaceName;
+    private String  methodName;
+    private String  group;
+    private String  version;
+
+
+    public MetricsCollectExecutor(DefaultMetricsCollector collector, Invocation invocation) {
+        init(invocation);
+
+        this.collector = collector;
+
+        this.invocation = invocation;
+    }
+
+    public void beforeExecute() {
+        collector.increaseTotalRequests(interfaceName, methodName, group, version);
+        collector.increaseProcessingRequests(interfaceName, methodName, group, version);
+        invocation.put(METRIC_FILTER_START_TIME, System.currentTimeMillis());
+    }
+
+    public void postExecute(Result result) {
+        if (result.hasException()) {
+            this.throwExecute(result.getException());
+            return;
+        }
+        collector.increaseSucceedRequests(interfaceName, methodName, group, version);
+        endExecute();
+    }
+
+    public void throwExecute(Throwable throwable){
+        if (throwable instanceof RpcException) {
+            RpcException rpcException = (RpcException)throwable;
+            if (rpcException.isBiz()) {
+                collector.businessFailedRequests(interfaceName, methodName, group, version);
+            }else{
+                collector.increaseFailedRequests(interfaceName, methodName, group, version);
+            }
+        }
+        endExecute(()-> throwable instanceof RpcException && ((RpcException) throwable).isBiz());
+    }
+
+    private void endExecute(){
+        this.endExecute(() -> true);
+    }
+
+    private void endExecute(Supplier<Boolean> rtStat){
+        if (rtStat.get()) {
+            Long endTime = System.currentTimeMillis();
+            Long beginTime = (Long) invocation.get(METRIC_FILTER_START_TIME);
+            Long rt = endTime - beginTime;
+            collector.addRT(interfaceName, methodName, group, version, rt);
+        }
+        collector.decreaseProcessingRequests(interfaceName, methodName, group, version);
+    }
+
+    private void init(Invocation invocation) {
+        String serviceUniqueName = invocation.getTargetServiceUniqueName();
+        String methodName = invocation.getMethodName();
+        String group = null;
+        String interfaceAndVersion;
+        String[] arr = serviceUniqueName.split("/");
+        if (arr.length == 2) {
+            group = arr[0];
+            interfaceAndVersion = arr[1];
+        } else {
+            interfaceAndVersion = arr[0];
+        }
+
+        String[] ivArr = interfaceAndVersion.split(":");
+        String interfaceName = ivArr[0];
+        String version = ivArr.length == 2 ? ivArr[1] : null;
+
+        this.interfaceName = interfaceName;
+        this.methodName = methodName;
+        this.group = group;
+        this.version = version;
+    }
+}
diff --git a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsFilter.java b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsFilter.java
index 031f8ef..4e5cc13 100644
--- a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsFilter.java
+++ b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/filter/MetricsFilter.java
@@ -14,11 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.dubbo.metrics.filter;
 
+import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER;
+
+import java.util.function.Consumer;
+
 import org.apache.dubbo.common.extension.Activate;
 import org.apache.dubbo.common.metrics.collector.DefaultMetricsCollector;
+import org.apache.dubbo.rpc.BaseFilter;
 import org.apache.dubbo.rpc.Filter;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
@@ -27,19 +31,17 @@
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ScopeModelAware;
 
-import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER;
-
 @Activate(group = PROVIDER, order = -1)
-public class MetricsFilter implements Filter, ScopeModelAware {
+public class MetricsFilter implements Filter, BaseFilter.Listener, ScopeModelAware {
 
     private DefaultMetricsCollector collector = null;
 
     private ApplicationModel applicationModel;
 
+
     @Override
     public void setApplicationModel(ApplicationModel applicationModel) {
         this.applicationModel = applicationModel;
-
         collector = applicationModel.getBeanFactory().getBean(DefaultMetricsCollector.class);
     }
 
@@ -48,38 +50,23 @@
         if (collector == null || !collector.isCollectEnabled()) {
             return invoker.invoke(invocation);
         }
+        collect(invocation, MetricsCollectExecutor::beforeExecute);
 
-        String serviceUniqueName = invocation.getTargetServiceUniqueName();
-        String methodName = invocation.getMethodName();
-        String group = null;
-        String interfaceAndVersion;
-        String[] arr = serviceUniqueName.split("/");
-        if (arr.length == 2) {
-            group = arr[0];
-            interfaceAndVersion = arr[1];
-        } else {
-            interfaceAndVersion = arr[0];
-        }
+        return invoker.invoke(invocation);
+    }
 
-        String[] ivArr = interfaceAndVersion.split(":");
-        String interfaceName = ivArr[0];
-        String version = ivArr.length == 2 ? ivArr[1] : null;
-        collector.increaseTotalRequests(interfaceName, methodName, group, version);
-        collector.increaseProcessingRequests(interfaceName, methodName, group, version);
+    @Override
+    public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
+        collect(invocation, collector->collector.postExecute(result));
+    }
 
-        Long startTime = System.currentTimeMillis();
-        try {
-            Result invoke = invoker.invoke(invocation);
-            collector.increaseSucceedRequests(interfaceName, methodName, group, version);
-            return invoke;
-        } catch (RpcException e) {
-            collector.increaseFailedRequests(interfaceName, methodName, group, version);
-            throw e;
-        } finally {
-            Long endTime = System.currentTimeMillis();
-            Long rt = endTime - startTime;
-            collector.addRT(interfaceName, methodName, group, version, rt);
-            collector.decreaseProcessingRequests(interfaceName, methodName, group, version);
-        }
+    @Override
+    public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+        collect(invocation,collector-> collector.throwExecute(t));
+    }
+
+    private void collect(Invocation invocation, Consumer<MetricsCollectExecutor> execute) {
+        MetricsCollectExecutor collectorExecutor = new MetricsCollectExecutor(collector, invocation);
+        execute.accept(collectorExecutor);
     }
 }
diff --git a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
index 308e238..0b3317a 100644
--- a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
+++ b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
@@ -18,6 +18,7 @@
 package org.apache.dubbo.metrics.collector;
 
 import org.apache.dubbo.common.metrics.collector.DefaultMetricsCollector;
+import org.apache.dubbo.common.metrics.model.MetricsKey;
 import org.apache.dubbo.common.metrics.model.sample.GaugeMetricSample;
 import org.apache.dubbo.common.metrics.model.sample.MetricSample;
 import org.apache.dubbo.config.ApplicationConfig;
@@ -81,6 +82,7 @@
         defaultCollector.increaseTotalRequests(interfaceName, methodName, group, version);
         defaultCollector.increaseSucceedRequests(interfaceName, methodName, group, version);
         defaultCollector.increaseFailedRequests(interfaceName, methodName, group, version);
+        defaultCollector.businessFailedRequests(interfaceName,methodName,group,version);
 
         List<MetricSample> samples = collector.collect();
         for (MetricSample sample : samples) {
@@ -101,6 +103,8 @@
         Assertions.assertEquals(sampleMap.get("requests.total.aggregate"), 1L);
         Assertions.assertEquals(sampleMap.get("requests.succeed.aggregate"), 1L);
         Assertions.assertEquals(sampleMap.get("requests.failed.aggregate"), 1L);
+        Assertions.assertEquals(sampleMap.get(MetricsKey.METRIC_REQUESTS_BUSINESS_FAILED_AGG.getName()), 1L);
+
         Assertions.assertTrue(sampleMap.containsKey("qps"));
     }
 
diff --git a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/filter/MetricsFilterTest.java b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/filter/MetricsFilterTest.java
index 2e86a83..dbe6806 100644
--- a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/filter/MetricsFilterTest.java
+++ b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/filter/MetricsFilterTest.java
@@ -17,11 +17,25 @@
 
 package org.apache.dubbo.metrics.filter;
 
+import static org.apache.dubbo.common.constants.MetricsConstants.TAG_GROUP_KEY;
+import static org.apache.dubbo.common.constants.MetricsConstants.TAG_INTERFACE_KEY;
+import static org.apache.dubbo.common.constants.MetricsConstants.TAG_METHOD_KEY;
+import static org.apache.dubbo.common.constants.MetricsConstants.TAG_VERSION_KEY;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import org.apache.dubbo.common.metrics.collector.DefaultMetricsCollector;
+import org.apache.dubbo.common.metrics.model.MetricsKey;
 import org.apache.dubbo.common.metrics.model.sample.MetricSample;
 import org.apache.dubbo.config.ApplicationConfig;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -30,15 +44,6 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static org.apache.dubbo.common.constants.MetricsConstants.*;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.mock;
-
 public class MetricsFilterTest {
 
     private ApplicationModel applicationModel;
@@ -89,8 +94,9 @@
 
         try {
             filter.invoke(invoker, invocation);
-        } catch (Exception ignore) {
-
+        } catch (Exception e) {
+            Assertions.assertTrue(e instanceof RpcException);
+            filter.onError(e, invoker, invocation);
         }
 
         Map<String, MetricSample> metricsMap = getMetricsMap();
@@ -106,13 +112,45 @@
         Assertions.assertEquals(tags.get(TAG_VERSION_KEY), VERSION);
     }
 
+
+    @Test
+    public void testBusinessFailedRequests() {
+        collector.setCollectEnabled(true);
+
+        given(invoker.invoke(invocation)).willThrow(new RpcException(RpcException.BIZ_EXCEPTION));
+        initParam();
+
+        try {
+            filter.invoke(invoker, invocation);
+        } catch (Exception e) {
+            Assertions.assertTrue(e instanceof RpcException);
+            filter.onError(e, invoker, invocation);
+        }
+
+        Map<String, MetricSample> metricsMap = getMetricsMap();
+        Assertions.assertTrue(metricsMap.containsKey(MetricsKey.METRIC_REQUEST_BUSINESS_FAILED.getName()));
+        Assertions.assertFalse(metricsMap.containsKey("requests.succeed"));
+
+        MetricSample sample = metricsMap.get(MetricsKey.METRIC_REQUEST_BUSINESS_FAILED.getName());
+
+        Map<String, String> tags = sample.getTags();
+
+        Assertions.assertEquals(tags.get(TAG_INTERFACE_KEY), INTERFACE_NAME);
+        Assertions.assertEquals(tags.get(TAG_METHOD_KEY), METHOD_NAME);
+        Assertions.assertEquals(tags.get(TAG_GROUP_KEY), GROUP);
+        Assertions.assertEquals(tags.get(TAG_VERSION_KEY), VERSION);
+    }
+
     @Test
     public void testSucceedRequests() {
         collector.setCollectEnabled(true);
         given(invoker.invoke(invocation)).willReturn(new AppResponse("success"));
         initParam();
 
-        filter.invoke(invoker, invocation);
+        Result result = filter.invoke(invoker, invocation);
+
+        filter.onResponse(result, invoker, invocation);
+
         Map<String, MetricSample> metricsMap = getMetricsMap();
         Assertions.assertFalse(metricsMap.containsKey("requests.failed"));
         Assertions.assertTrue(metricsMap.containsKey("requests.succeed"));
@@ -134,7 +172,10 @@
         invocation.setMethodName(METHOD_NAME);
         invocation.setParameterTypes(new Class[]{String.class});
 
-        filter.invoke(invoker, invocation);
+        Result result = filter.invoke(invoker, invocation);
+
+        filter.onResponse(result, invoker, invocation);
+
         Map<String, MetricSample> metricsMap = getMetricsMap();
 
         MetricSample sample = metricsMap.get("requests.succeed");
@@ -154,7 +195,10 @@
         invocation.setMethodName(METHOD_NAME);
         invocation.setParameterTypes(new Class[]{String.class});
 
-        filter.invoke(invoker, invocation);
+        Result result = filter.invoke(invoker, invocation);
+
+        filter.onResponse(result, invoker, invocation);
+
         Map<String, MetricSample> metricsMap = getMetricsMap();
 
         MetricSample sample = metricsMap.get("requests.succeed");
@@ -174,7 +218,10 @@
         invocation.setMethodName(METHOD_NAME);
         invocation.setParameterTypes(new Class[]{String.class});
 
-        filter.invoke(invoker, invocation);
+        Result result = filter.invoke(invoker, invocation);
+
+        filter.onResponse(result, invoker, invocation);
+
         Map<String, MetricSample> metricsMap = getMetricsMap();
 
         MetricSample sample = metricsMap.get("requests.succeed");