blob: f206890e3d24dae2ade5c7f0fda1009c73448bc1 [file] [log] [blame]
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.monitor.support;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.monitor.Monitor;
import com.alibaba.dubbo.monitor.MonitorFactory;
import com.alibaba.dubbo.monitor.MonitorService;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
/**
* MonitorFilter. (SPI, Singleton, ThreadSafe)
*
* @author william.liangf
*/
@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class MonitorFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);
private final ConcurrentMap<String, AtomicInteger> concurrents = new ConcurrentHashMap<String, AtomicInteger>();
private MonitorFactory monitorFactory;
public void setMonitorFactory(MonitorFactory monitorFactory) {
this.monitorFactory = monitorFactory;
}
// 调用过程拦截
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
RpcContext context = RpcContext.getContext(); // 提供方必须在invoke()之前获取context信息
long start = System.currentTimeMillis(); // 记录起始时间戮
getConcurrent(invoker, invocation).incrementAndGet(); // 并发计数
try {
Result result = invoker.invoke(invocation); // 让调用链往下执行
collect(invoker, invocation, context, start, false);
return result;
} catch (RpcException e) {
collect(invoker, invocation, context, start, true);
throw e;
} finally {
getConcurrent(invoker, invocation).decrementAndGet(); // 并发计数
}
} else {
return invoker.invoke(invocation);
}
}
// 信息采集
private void collect(Invoker<?> invoker, Invocation invocation, RpcContext context, long start, boolean error) {
try {
// ---- 服务信息获取 ----
long elapsed = System.currentTimeMillis() - start; // 计算调用耗时
int concurrent = getConcurrent(invoker, invocation).get(); // 当前并发数
String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
String service = invoker.getInterface().getName(); // 获取服务名称
String method = invocation.getMethodName(); // 获取方法名
URL url = URL.valueOf(invoker.getUrl().getParameterAndDecoded(Constants.MONITOR_KEY));
Monitor monitor = monitorFactory.getMonitor(url);
// ---- 服务提供方监控 ----
String server = context.getLocalAddressString(); // 本地提供方地址
if (invoker.getUrl().getAddress().equals(server)) {
monitor.count(new URL(invoker.getUrl().getProtocol(), context.getRemoteHost(), 0, service + "/" + method)
.addParameters(MonitorService.APPLICATION, application,
MonitorService.INTERFACE, service,
MonitorService.METHOD, method,
MonitorService.PROVIDER, NetUtils.getLocalHost() + ":" + context.getLocalPort(),
error ? MonitorService.FAILURE : MonitorService.SUCCESS, String.valueOf(1),
MonitorService.ELAPSED, String.valueOf(elapsed),
MonitorService.CONCURRENT, String.valueOf(concurrent)));
}
// ---- 服务消费方监控 ----
context = RpcContext.getContext(); // 消费方必须在invoke()之后获取context信息
server = context.getRemoteAddressString(); // 远程提供方地址
if (invoker.getUrl().getAddress().equals(server)) {
monitor.count(new URL(invoker.getUrl().getProtocol(), context.getRemoteHost(), context.getRemotePort(), service + "/" + method)
.addParameters(MonitorService.APPLICATION, application,
MonitorService.INTERFACE, service,
MonitorService.METHOD, method,
MonitorService.CONSUMER, NetUtils.getLocalHost(),
error ? MonitorService.FAILURE : MonitorService.SUCCESS, String.valueOf(1),
MonitorService.ELAPSED, String.valueOf(elapsed),
MonitorService.CONCURRENT, String.valueOf(concurrent)));
}
} catch (Throwable t) {
logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
}
}
// 获取并发计数器
private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) {
String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
AtomicInteger concurrent = concurrents.get(key);
if (concurrent == null) {
concurrents.putIfAbsent(key, new AtomicInteger());
concurrent = concurrents.get(key);
}
return concurrent;
}
}