| /*
|
| * Copyright 1999-2011 Alibaba Group.
|
| *
|
| * Licensed under the Apache License, Version 2.0 (the "License");
|
| * you may not use this file except in compliance with the License.
|
| * You may obtain a copy of the License at
|
| *
|
| * http://www.apache.org/licenses/LICENSE-2.0
|
| *
|
| * Unless required by applicable law or agreed to in writing, software
|
| * distributed under the License is distributed on an "AS IS" BASIS,
|
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| * See the License for the specific language governing permissions and
|
| * limitations under the License.
|
| */
|
| package com.alibaba.dubbo.rpc.cluster.support; |
| |
| import java.util.ArrayList;
|
| import java.util.List;
|
| import java.util.concurrent.BlockingQueue;
|
| import java.util.concurrent.ExecutorService;
|
| import java.util.concurrent.Executors;
|
| import java.util.concurrent.LinkedBlockingQueue;
|
| import java.util.concurrent.TimeUnit;
|
| import java.util.concurrent.atomic.AtomicInteger;
|
|
|
| import com.alibaba.dubbo.common.Constants;
|
| import com.alibaba.dubbo.common.utils.NamedThreadFactory;
|
| import com.alibaba.dubbo.rpc.Invocation;
|
| import com.alibaba.dubbo.rpc.Invoker;
|
| import com.alibaba.dubbo.rpc.Result;
|
| import com.alibaba.dubbo.rpc.RpcContext;
|
| import com.alibaba.dubbo.rpc.RpcException;
|
| import com.alibaba.dubbo.rpc.cluster.Directory;
|
| import com.alibaba.dubbo.rpc.cluster.LoadBalance;
|
| |
| /** |
| * 并行调用,只要一个成功即返回,通常用于实时性要求较高的操作,但需要浪费更多服务资源。
|
| *
|
| * <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a> |
| * |
| * @author william.liangf |
| */ |
| public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T>{ |
|
|
| private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true));
|
| |
| public ForkingClusterInvoker(Directory<T> directory) { |
| super(directory); |
| } |
| |
| @SuppressWarnings({ "unchecked", "rawtypes" })
|
| public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
|
| checkInvokers(invokers, invocation);
|
| final List<Invoker<T>> selected; |
| final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); |
| final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); |
| if (forks <= 0 || forks >= invokers.size()) { |
| selected = invokers; |
| } else { |
| selected = new ArrayList<Invoker<T>>(); |
| for (int i = 0; i < forks; i++) { |
| //在invoker列表(排除selected)后,如果没有选够,则存在重复循环问题.见select实现. |
| Invoker<T> invoker = select(loadbalance, invocation, invokers, selected); |
| if(!selected.contains(invoker)){//防止重复添加invoker |
| selected.add(invoker); |
| } |
| } |
| }
|
| RpcContext.getContext().setInvokers((List)selected); |
| final AtomicInteger count = new AtomicInteger(); |
| final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>(); |
| for (final Invoker<T> invoker : selected) { |
| executor.execute(new Runnable() { |
| public void run() { |
| try { |
| Result result = invoker.invoke(invocation); |
| ref.offer(result); |
| } catch(Throwable e) { |
| int value = count.incrementAndGet(); |
| if (value >= selected.size()) { |
| ref.offer(e); |
| } |
| } |
| } |
| }); |
| } |
| try { |
| Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
|
| if (ret instanceof Throwable) {
|
| Throwable e = (Throwable) ret;
|
| throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
|
| }
|
| return (Result) ret; |
| } catch (InterruptedException e) { |
| throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); |
| } |
| } |
| } |