| /*
|
| * Copyright 1999-2011 Alibaba Group.
|
| *
|
| * Licensed under the Apache License, Version 2.0 (the "License");
|
| * you may not use this file except in compliance with the License.
|
| * You may obtain a copy of the License at
|
| *
|
| * http://www.apache.org/licenses/LICENSE-2.0
|
| *
|
| * Unless required by applicable law or agreed to in writing, software
|
| * distributed under the License is distributed on an "AS IS" BASIS,
|
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| * See the License for the specific language governing permissions and
|
| * limitations under the License.
|
| */
|
| package com.alibaba.dubbo.rpc.protocol.dubbo; |
| |
| import java.io.IOException;
|
| import java.util.HashMap;
|
| import java.util.Map;
|
| import java.util.Set;
|
|
|
| import com.alibaba.dubbo.common.Constants;
|
| import com.alibaba.dubbo.common.URL;
|
| import com.alibaba.dubbo.common.bytecode.Wrapper;
|
| import com.alibaba.dubbo.common.extension.ExtensionLoader;
|
| import com.alibaba.dubbo.common.logger.Logger;
|
| import com.alibaba.dubbo.common.logger.LoggerFactory;
|
| import com.alibaba.dubbo.common.utils.ConcurrentHashSet;
|
| import com.alibaba.dubbo.common.utils.StringUtils;
|
| import com.alibaba.dubbo.remoting.Channel;
|
| import com.alibaba.dubbo.remoting.RemotingException;
|
| import com.alibaba.dubbo.rpc.Exporter;
|
| import com.alibaba.dubbo.rpc.Invocation;
|
| import com.alibaba.dubbo.rpc.Invoker;
|
| import com.alibaba.dubbo.rpc.ProxyFactory;
|
| import com.alibaba.dubbo.rpc.RpcInvocation;
|
| |
| /** |
| * callback 服务帮助类. |
| * @author chao.liuc |
| * |
| */ |
| class CallbackServiceCodec { |
| private static final Logger logger = LoggerFactory.getLogger(CallbackServiceCodec.class); |
| |
| private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); |
| private static final DubboProtocol protocol = DubboProtocol.getDubboProtocol(); |
| private static final byte CALLBACK_NONE = 0x0; |
| private static final byte CALLBACK_CREATE = 0x1; |
| private static final byte CALLBACK_DESTROY = 0x2; |
| private static final String INV_ATT_CALLBACK_KEY = "sys_callback_arg-";
|
| |
| private static byte isCallBack(URL url, String methodName ,int argIndex){ |
| //参数callback的规则是 方法名称.参数index(0开始).callback |
| byte isCallback = CALLBACK_NONE; |
| if (url != null ) { |
| String callback = url.getParameter(methodName+"."+argIndex+".callback"); |
| if(callback != null) { |
| if (callback.equalsIgnoreCase("true")) { |
| isCallback = CALLBACK_CREATE; |
| }else if(callback.equalsIgnoreCase("false")){ |
| isCallback = CALLBACK_DESTROY; |
| } |
| } |
| } |
| return isCallback; |
| } |
| |
| /** |
| * client 端export callback service
|
| * @param channel
|
| * @param clazz
|
| * @param inst
|
| * @param export
|
| * @param out |
| * @throws IOException |
| */ |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| private static String exportOrunexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException{ |
| int instid = System.identityHashCode(inst); |
| |
| Map<String,String> params = new HashMap<String,String>(3); |
| //不需要在重新new client |
| params.put(Constants.IS_SERVER_KEY, Boolean.FALSE.toString()); |
| //标识callback 变于排查问题 |
| params.put(Constants.IS_CALLBACK_SERVICE, Boolean.TRUE.toString()); |
| String group = url.getParameter(Constants.GROUP_KEY); |
| if (group != null && group.length() > 0){ |
| params.put(Constants.GROUP_KEY,group); |
| } |
| //增加方法,变于方法检查,自动降级(见dubbo protocol) |
| params.put(Constants.METHODS_KEY, StringUtils.join(Wrapper.getWrapper(clazz).getDeclaredMethodNames(), ",")); |
| |
| Map<String, String> tmpmap = new HashMap<String, String>(url.getParameters()); |
| tmpmap.putAll(params); |
| tmpmap.remove(Constants.VERSION_KEY);//callback不需要区分version |
| URL exporturl = new URL(DubboProtocol.NAME, channel.getLocalAddress().getAddress().getHostAddress(), channel.getLocalAddress().getPort(), clazz.getName()+"."+instid, tmpmap); |
| |
| //同一个jvm不需要对不同的channel产生多个exporter cache key不会碰撞 |
| String cacheKey = getClientSideCallbackServiceCacheKey(instid); |
| String countkey = getClientSideCountKey(clazz.getName()); |
| if(export){ |
| //同一个channel 可以有多个callback instance. 不同的instance不重新export |
| if( ! channel.hasAttribute(cacheKey)){ |
| if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) { |
| Invoker<?> invoker = proxyFactory.getInvoker(inst, clazz, exporturl); |
| //资源销毁? |
| Exporter<?> exporter = protocol.export(invoker); |
| //这个用来记录instid是否发布过服务 |
| channel.setAttribute(cacheKey, exporter); |
| logger.info("export a callback service :"+exporturl +", on "+channel + ", url is: " + url); |
| increaseInstanceCount(channel, countkey); |
| } |
| } |
| }else { |
| if(channel.hasAttribute(cacheKey)){ |
| Exporter<?> exporter = (Exporter<?>) channel.getAttribute(cacheKey); |
| exporter.unexport(); |
| channel.removeAttribute(cacheKey); |
| decreaseInstanceCount(channel, countkey); |
| } |
| } |
| return String.valueOf(instid); |
| } |
| |
| /** |
| * server端 应用一个callbackservice
|
| * @param url |
| */ |
| @SuppressWarnings("unchecked") |
| private static Object referOrdestroyCallbackService(Channel channel, URL url, Class<?> clazz ,Invocation inv ,int instid, boolean isRefer){ |
| Object proxy = null; |
| String invokerCacheKey = getServerSideCallbackInvokerCacheKey(channel, clazz.getName(), instid); |
| String proxyCacheKey = getServerSideCallbackServiceCacheKey(channel, clazz.getName(), instid); |
| proxy = channel.getAttribute(proxyCacheKey) ; |
| String countkey = getServerSideCountKey(channel, clazz.getName()); |
| if (isRefer){ |
| if( proxy == null ){ |
| if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, true)){
|
| url = url.setPath(clazz.getName()); |
| @SuppressWarnings("rawtypes")
|
| Invoker<?> invoker = new ChannelWrappedInvoker(clazz, channel, url, String.valueOf(instid)); |
| proxy = proxyFactory.getProxy(invoker); |
| channel.setAttribute(proxyCacheKey, proxy); |
| channel.setAttribute(invokerCacheKey, invoker); |
| increaseInstanceCount(channel, countkey); |
| |
| //convert error fail fast . |
| //ignore concurrent problem. |
| Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>)channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY); |
| if (callbackInvokers == null){ |
| callbackInvokers = new ConcurrentHashSet<Invoker<?>>(1); |
| callbackInvokers.add(invoker); |
| channel.setAttribute(Constants.CHANNEL_CALLBACK_KEY, callbackInvokers); |
| } |
| logger.info ("method "+inv.getMethodName()+" include a callback service :"+invoker.getUrl() +", a proxy :"+invoker +" has been created.") ; |
| } |
| } |
| } else { |
| if(proxy != null){ |
| Invoker<?> invoker = (Invoker<?>)channel.getAttribute(invokerCacheKey); |
| try{ |
| Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>)channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY); |
| if (callbackInvokers != null ) { |
| callbackInvokers.remove(invoker); |
| } |
| invoker.destroy(); |
| }catch (Exception e) { |
| logger.error(e.getMessage(), e); |
| } |
| //取消refer 直接在map中去除, |
| channel.removeAttribute(proxyCacheKey); |
| channel.removeAttribute(invokerCacheKey); |
| decreaseInstanceCount(channel,countkey); |
| } |
| } |
| return proxy; |
| } |
| |
| private static String getClientSideCallbackServiceCacheKey(int instid){ |
| return Constants.CALLBACK_SERVICE_KEY+"."+instid; |
| } |
| private static String getServerSideCallbackServiceCacheKey(Channel channel, String interfaceClass, int instid){ |
| return Constants.CALLBACK_SERVICE_PROXY_KEY+"."+System.identityHashCode(channel)+"."+ interfaceClass +"."+instid; |
| } |
| private static String getServerSideCallbackInvokerCacheKey(Channel channel, String interfaceClass, int instid){ |
| return getServerSideCallbackServiceCacheKey(channel, interfaceClass, instid) + "." + "invoker"; |
| } |
| |
| private static String getClientSideCountKey(String interfaceClass){ |
| return Constants.CALLBACK_SERVICE_KEY+"."+interfaceClass+".COUNT"; |
| } |
| private static String getServerSideCountKey(Channel channel, String interfaceClass){ |
| return Constants.CALLBACK_SERVICE_PROXY_KEY+"."+System.identityHashCode(channel)+"."+interfaceClass+".COUNT"; |
| } |
| private static boolean isInstancesOverLimit(Channel channel, URL url ,String interfaceClass, int instid, boolean isServer){ |
| Integer count = (Integer)channel.getAttribute(isServer ? getServerSideCountKey(channel,interfaceClass) : getClientSideCountKey(interfaceClass)); |
| int limit = url.getParameter(Constants.CALLBACK_INSTANCES_LIMIT_KEY, Constants.DEFAULT_CALLBACK_INSTANCES); |
| if (count != null && count >= limit){ |
| //client side error |
| throw new IllegalStateException("interface " + interfaceClass +" `s callback instances num exceed providers limit :"+ limit |
| +" ,current num: "+(count+1)+". The new callback service will not work !!! you can cancle the callback service which exported before. channel :"+ channel); |
| }else { |
| return false; |
| } |
| } |
| private static void increaseInstanceCount(Channel channel, String countkey){ |
| try{ |
| //ignore cuncurrent problem? |
| Integer count = (Integer)channel.getAttribute(countkey); |
| if (count == null ){ |
| count = 1; |
| }else { |
| count ++ ; |
| } |
| channel.setAttribute(countkey, count); |
| }catch (Exception e) { |
| logger.error(e.getMessage(), e); |
| } |
| } |
| private static void decreaseInstanceCount(Channel channel, String countkey){ |
| try{ |
| Integer count = (Integer)channel.getAttribute(countkey); |
| if (count == null || count <= 0){ |
| return; |
| }else { |
| count -- ; |
| } |
| channel.setAttribute(countkey, count); |
| }catch (Exception e) { |
| logger.error(e.getMessage(), e); |
| } |
| } |
| |
| public static Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException{
|
| //encode时可直接获取url
|
| URL url = inv.getInvoker() == null ? null : inv.getInvoker().getUrl(); |
| byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex); |
| Object[] args = inv.getArguments(); |
| Class<?>[] pts = inv.getParameterTypes(); |
| switch (callbackstatus) { |
| case CallbackServiceCodec.CALLBACK_NONE: |
| return args[paraIndex]; |
| case CallbackServiceCodec.CALLBACK_CREATE: |
| inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex , exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], true)); |
| return null; |
| case CallbackServiceCodec.CALLBACK_DESTROY: |
| inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], false)); |
| return null; |
| default: |
| return args[paraIndex]; |
| } |
| } |
| public static Object decodeInvocationArgument(Channel channel, RpcInvocation inv, Class<?>[] pts, int paraIndex, Object inObject) throws IOException{ |
| //如果是callback,则创建proxy到客户端,方法的执行可通过channel调用到client端的callback接口
|
| //decode时需要根据channel及env获取url
|
| URL url = null ;
|
| try {
|
| url = DubboProtocol.getDubboProtocol().getInvoker(channel, inv).getUrl();
|
| } catch (RemotingException e) {
|
| if (logger.isInfoEnabled()) {
|
| logger.info(e.getMessage(), e);
|
| }
|
| return inObject;
|
| } |
| byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex); |
| switch (callbackstatus) { |
| case CallbackServiceCodec.CALLBACK_NONE: |
| return inObject; |
| case CallbackServiceCodec.CALLBACK_CREATE: |
| try{ |
| return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), true); |
| }catch (Exception e) { |
| logger.error(e.getMessage(), e); |
| throw new IOException(StringUtils.toString(e)); |
| } |
| case CallbackServiceCodec.CALLBACK_DESTROY: |
| try{ |
| return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), false); |
| }catch (Exception e) { |
| throw new IOException(StringUtils.toString(e)); |
| } |
| default: |
| return inObject ; |
| } |
| }
|
| } |