| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.alibaba.dubbo.rpc.cluster.support; |
| |
| import com.alibaba.dubbo.common.Constants; |
| import com.alibaba.dubbo.common.URL; |
| import com.alibaba.dubbo.common.extension.ExtensionLoader; |
| import com.alibaba.dubbo.common.logger.Logger; |
| import com.alibaba.dubbo.common.logger.LoggerFactory; |
| import com.alibaba.dubbo.common.utils.ConfigUtils; |
| import com.alibaba.dubbo.common.utils.NamedThreadFactory; |
| import com.alibaba.dubbo.rpc.Invocation; |
| import com.alibaba.dubbo.rpc.Invoker; |
| import com.alibaba.dubbo.rpc.Result; |
| import com.alibaba.dubbo.rpc.RpcException; |
| import com.alibaba.dubbo.rpc.RpcInvocation; |
| import com.alibaba.dubbo.rpc.RpcResult; |
| import com.alibaba.dubbo.rpc.cluster.Directory; |
| import com.alibaba.dubbo.rpc.cluster.Merger; |
| import com.alibaba.dubbo.rpc.cluster.merger.MergerFactory; |
| |
| import java.lang.reflect.Array; |
| import java.lang.reflect.Method; |
| import java.lang.reflect.Modifier; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| |
| @SuppressWarnings("unchecked") |
| public class MergeableClusterInvoker<T> implements Invoker<T> { |
| |
| private static final Logger log = LoggerFactory.getLogger(MergeableClusterInvoker.class); |
| private final Directory<T> directory; |
| private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true)); |
| |
| public MergeableClusterInvoker(Directory<T> directory) { |
| this.directory = directory; |
| } |
| |
| @Override |
| @SuppressWarnings("rawtypes") |
| public Result invoke(final Invocation invocation) throws RpcException { |
| List<Invoker<T>> invokers = directory.list(invocation); |
| |
| String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY); |
| if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group |
| for (final Invoker<T> invoker : invokers) { |
| if (invoker.isAvailable()) { |
| return invoker.invoke(invocation); |
| } |
| } |
| return invokers.iterator().next().invoke(invocation); |
| } |
| |
| Class<?> returnType; |
| try { |
| returnType = getInterface().getMethod( |
| invocation.getMethodName(), invocation.getParameterTypes()).getReturnType(); |
| } catch (NoSuchMethodException e) { |
| returnType = null; |
| } |
| |
| Map<String, Future<Result>> results = new HashMap<String, Future<Result>>(); |
| for (final Invoker<T> invoker : invokers) { |
| Future<Result> future = executor.submit(new Callable<Result>() { |
| @Override |
| public Result call() throws Exception { |
| return invoker.invoke(new RpcInvocation(invocation, invoker)); |
| } |
| }); |
| results.put(invoker.getUrl().getServiceKey(), future); |
| } |
| |
| Object result = null; |
| |
| List<Result> resultList = new ArrayList<Result>(results.size()); |
| |
| int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); |
| for (Map.Entry<String, Future<Result>> entry : results.entrySet()) { |
| Future<Result> future = entry.getValue(); |
| try { |
| Result r = future.get(timeout, TimeUnit.MILLISECONDS); |
| if (r.hasException()) { |
| log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + |
| " failed: " + r.getException().getMessage(), |
| r.getException()); |
| } else { |
| resultList.add(r); |
| } |
| } catch (Exception e) { |
| throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e); |
| } |
| } |
| |
| if (resultList.isEmpty()) { |
| return new RpcResult((Object) null); |
| } else if (resultList.size() == 1) { |
| return resultList.iterator().next(); |
| } |
| |
| if (returnType == void.class) { |
| return new RpcResult((Object) null); |
| } |
| |
| if (merger.startsWith(".")) { |
| merger = merger.substring(1); |
| Method method; |
| try { |
| method = returnType.getMethod(merger, returnType); |
| } catch (NoSuchMethodException e) { |
| throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + |
| returnType.getClass().getName() + " ]"); |
| } |
| if (!Modifier.isPublic(method.getModifiers())) { |
| method.setAccessible(true); |
| } |
| result = resultList.remove(0).getValue(); |
| try { |
| if (method.getReturnType() != void.class |
| && method.getReturnType().isAssignableFrom(result.getClass())) { |
| for (Result r : resultList) { |
| result = method.invoke(result, r.getValue()); |
| } |
| } else { |
| for (Result r : resultList) { |
| method.invoke(result, r.getValue()); |
| } |
| } |
| } catch (Exception e) { |
| throw new RpcException("Can not merge result: " + e.getMessage(), e); |
| } |
| } else { |
| Merger resultMerger; |
| if (ConfigUtils.isDefault(merger)) { |
| resultMerger = MergerFactory.getMerger(returnType); |
| } else { |
| resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger); |
| } |
| if (resultMerger != null) { |
| List<Object> rets = new ArrayList<Object>(resultList.size()); |
| for (Result r : resultList) { |
| rets.add(r.getValue()); |
| } |
| result = resultMerger.merge( |
| rets.toArray((Object[]) Array.newInstance(returnType, 0))); |
| } else { |
| throw new RpcException("There is no merger to merge result."); |
| } |
| } |
| return new RpcResult(result); |
| } |
| |
| @Override |
| public Class<T> getInterface() { |
| return directory.getInterface(); |
| } |
| |
| @Override |
| public URL getUrl() { |
| return directory.getUrl(); |
| } |
| |
| @Override |
| public boolean isAvailable() { |
| return directory.isAvailable(); |
| } |
| |
| @Override |
| public void destroy() { |
| directory.destroy(); |
| } |
| |
| private String getGroupDescFromServiceKey(String key) { |
| int index = key.indexOf("/"); |
| if (index > 0) { |
| return "group [ " + key.substring(0, index) + " ]"; |
| } |
| return key; |
| } |
| } |