blob: 4b6895635bb4c72b2556a1e6ef96aa2fce03ae04 [file] [log] [blame]
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.protocol.dubbo;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.bytecode.Wrapper;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.ConcurrentHashSet;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.RpcInvocation;
/**
* callback 服务帮助类.
* @author chao.liuc
*
*/
class CallbackServiceCodec {
private static final Logger logger = LoggerFactory.getLogger(CallbackServiceCodec.class);
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
private static final DubboProtocol protocol = DubboProtocol.getDubboProtocol();
private static final byte CALLBACK_NONE = 0x0;
private static final byte CALLBACK_CREATE = 0x1;
private static final byte CALLBACK_DESTROY = 0x2;
private static final String INV_ATT_CALLBACK_KEY = "sys_callback_arg-";
private static byte isCallBack(URL url, String methodName ,int argIndex){
//参数callback的规则是 方法名称.参数index(0开始).callback
byte isCallback = CALLBACK_NONE;
if (url != null ) {
String callback = url.getParameter(methodName+"."+argIndex+".callback");
if(callback != null) {
if (callback.equalsIgnoreCase("true")) {
isCallback = CALLBACK_CREATE;
}else if(callback.equalsIgnoreCase("false")){
isCallback = CALLBACK_DESTROY;
}
}
}
return isCallback;
}
/**
* client 端export callback service
* @param channel
* @param clazz
* @param inst
* @param export
* @param out
* @throws IOException
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
private static String exportOrunexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException{
int instid = System.identityHashCode(inst);
Map<String,String> params = new HashMap<String,String>(3);
//不需要在重新new client
params.put(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
//标识callback 变于排查问题
params.put(Constants.IS_CALLBACK_SERVICE, Boolean.TRUE.toString());
String group = url.getParameter(Constants.GROUP_KEY);
if (group != null && group.length() > 0){
params.put(Constants.GROUP_KEY,group);
}
//增加方法,变于方法检查,自动降级(见dubbo protocol)
params.put(Constants.METHODS_KEY, StringUtils.join(Wrapper.getWrapper(clazz).getDeclaredMethodNames(), ","));
Map<String, String> tmpmap = new HashMap<String, String>(url.getParameters());
tmpmap.putAll(params);
tmpmap.remove(Constants.VERSION_KEY);//callback不需要区分version
URL exporturl = new URL(DubboProtocol.NAME, channel.getLocalAddress().getAddress().getHostAddress(), channel.getLocalAddress().getPort(), clazz.getName()+"."+instid, tmpmap);
//同一个jvm不需要对不同的channel产生多个exporter cache key不会碰撞
String cacheKey = getClientSideCallbackServiceCacheKey(instid);
String countkey = getClientSideCountKey(clazz.getName());
if(export){
//同一个channel 可以有多个callback instance. 不同的instance不重新export
if( ! channel.hasAttribute(cacheKey)){
if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) {
Invoker<?> invoker = proxyFactory.getInvoker(inst, clazz, exporturl);
//资源销毁?
Exporter<?> exporter = protocol.export(invoker);
//这个用来记录instid是否发布过服务
channel.setAttribute(cacheKey, exporter);
logger.info("export a callback service :"+exporturl +", on "+channel + ", url is: " + url);
increaseInstanceCount(channel, countkey);
}
}
}else {
if(channel.hasAttribute(cacheKey)){
Exporter<?> exporter = (Exporter<?>) channel.getAttribute(cacheKey);
exporter.unexport();
channel.removeAttribute(cacheKey);
decreaseInstanceCount(channel, countkey);
}
}
return String.valueOf(instid);
}
/**
* server端 应用一个callbackservice
* @param url
*/
@SuppressWarnings("unchecked")
private static Object referOrdestroyCallbackService(Channel channel, URL url, Class<?> clazz ,Invocation inv ,int instid, boolean isRefer){
Object proxy = null;
String invokerCacheKey = getServerSideCallbackInvokerCacheKey(channel, clazz.getName(), instid);
String proxyCacheKey = getServerSideCallbackServiceCacheKey(channel, clazz.getName(), instid);
proxy = channel.getAttribute(proxyCacheKey) ;
String countkey = getServerSideCountKey(channel, clazz.getName());
if (isRefer){
if( proxy == null ){
if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, true)){
url = url.setPath(clazz.getName());
@SuppressWarnings("rawtypes")
Invoker<?> invoker = new ChannelWrappedInvoker(clazz, channel, url, String.valueOf(instid));
proxy = proxyFactory.getProxy(invoker);
channel.setAttribute(proxyCacheKey, proxy);
channel.setAttribute(invokerCacheKey, invoker);
increaseInstanceCount(channel, countkey);
//convert error fail fast .
//ignore concurrent problem.
Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>)channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY);
if (callbackInvokers == null){
callbackInvokers = new ConcurrentHashSet<Invoker<?>>(1);
callbackInvokers.add(invoker);
channel.setAttribute(Constants.CHANNEL_CALLBACK_KEY, callbackInvokers);
}
logger.info ("method "+inv.getMethodName()+" include a callback service :"+invoker.getUrl() +", a proxy :"+invoker +" has been created.") ;
}
}
} else {
if(proxy != null){
Invoker<?> invoker = (Invoker<?>)channel.getAttribute(invokerCacheKey);
try{
Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>)channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY);
if (callbackInvokers != null ) {
callbackInvokers.remove(invoker);
}
invoker.destroy();
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
//取消refer 直接在map中去除,
channel.removeAttribute(proxyCacheKey);
channel.removeAttribute(invokerCacheKey);
decreaseInstanceCount(channel,countkey);
}
}
return proxy;
}
private static String getClientSideCallbackServiceCacheKey(int instid){
return Constants.CALLBACK_SERVICE_KEY+"."+instid;
}
private static String getServerSideCallbackServiceCacheKey(Channel channel, String interfaceClass, int instid){
return Constants.CALLBACK_SERVICE_PROXY_KEY+"."+System.identityHashCode(channel)+"."+ interfaceClass +"."+instid;
}
private static String getServerSideCallbackInvokerCacheKey(Channel channel, String interfaceClass, int instid){
return getServerSideCallbackServiceCacheKey(channel, interfaceClass, instid) + "." + "invoker";
}
private static String getClientSideCountKey(String interfaceClass){
return Constants.CALLBACK_SERVICE_KEY+"."+interfaceClass+".COUNT";
}
private static String getServerSideCountKey(Channel channel, String interfaceClass){
return Constants.CALLBACK_SERVICE_PROXY_KEY+"."+System.identityHashCode(channel)+"."+interfaceClass+".COUNT";
}
private static boolean isInstancesOverLimit(Channel channel, URL url ,String interfaceClass, int instid, boolean isServer){
Integer count = (Integer)channel.getAttribute(isServer ? getServerSideCountKey(channel,interfaceClass) : getClientSideCountKey(interfaceClass));
int limit = url.getParameter(Constants.CALLBACK_INSTANCES_LIMIT_KEY, Constants.DEFAULT_CALLBACK_INSTANCES);
if (count != null && count >= limit){
//client side error
throw new IllegalStateException("interface " + interfaceClass +" `s callback instances num exceed providers limit :"+ limit
+" ,current num: "+(count+1)+". The new callback service will not work !!! you can cancle the callback service which exported before. channel :"+ channel);
}else {
return false;
}
}
private static void increaseInstanceCount(Channel channel, String countkey){
try{
//ignore cuncurrent problem?
Integer count = (Integer)channel.getAttribute(countkey);
if (count == null ){
count = 1;
}else {
count ++ ;
}
channel.setAttribute(countkey, count);
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private static void decreaseInstanceCount(Channel channel, String countkey){
try{
Integer count = (Integer)channel.getAttribute(countkey);
if (count == null || count <= 0){
return;
}else {
count -- ;
}
channel.setAttribute(countkey, count);
}catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public static Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException{
//encode时可直接获取url
URL url = inv.getInvoker() == null ? null : inv.getInvoker().getUrl();
byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex);
Object[] args = inv.getArguments();
Class<?>[] pts = inv.getParameterTypes();
switch (callbackstatus) {
case CallbackServiceCodec.CALLBACK_NONE:
return args[paraIndex];
case CallbackServiceCodec.CALLBACK_CREATE:
inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex , exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], true));
return null;
case CallbackServiceCodec.CALLBACK_DESTROY:
inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], false));
return null;
default:
return args[paraIndex];
}
}
public static Object decodeInvocationArgument(Channel channel, RpcInvocation inv, Class<?>[] pts, int paraIndex, Object inObject) throws IOException{
//如果是callback,则创建proxy到客户端,方法的执行可通过channel调用到client端的callback接口
//decode时需要根据channel及env获取url
URL url = null ;
try {
url = DubboProtocol.getDubboProtocol().getInvoker(channel, inv).getUrl();
} catch (RemotingException e) {
if (logger.isInfoEnabled()) {
logger.info(e.getMessage(), e);
}
return inObject;
}
byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex);
switch (callbackstatus) {
case CallbackServiceCodec.CALLBACK_NONE:
return inObject;
case CallbackServiceCodec.CALLBACK_CREATE:
try{
return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), true);
}catch (Exception e) {
logger.error(e.getMessage(), e);
throw new IOException(StringUtils.toString(e));
}
case CallbackServiceCodec.CALLBACK_DESTROY:
try{
return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), false);
}catch (Exception e) {
throw new IOException(StringUtils.toString(e));
}
default:
return inObject ;
}
}
}