blob: 25e717878d115dbe7a70080cb30588f67fca8094 [file] [log] [blame]
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.cluster.support;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.cluster.Directory;
import com.alibaba.dubbo.rpc.cluster.LoadBalance;
/**
* 并行调用,只要一个成功即返回,通常用于实时性要求较高的操作,但需要浪费更多服务资源。
*
* @author william.liangf
*/
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T>{
public ForkingClusterInvoker(Directory<T> directory) {
super(directory);
}
private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true));
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
if (invokers == null || invokers.size() == 0)
throw new RpcException("No provider available for service " + getInterface().getName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", Please check whether the service do exist or version is right firstly, and check the provider has started.");
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
//在invoker列表(排除selected)后,如果没有选够,则存在重复循环问题.见select实现.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if(!selected.contains(invoker)){//防止重复添加invoker
selected.add(invoker);
}
}
}
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Result> ref = new LinkedBlockingQueue<Result>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch(Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(new RpcResult(e));
}
}
}
});
}
try {
return ref.poll(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", cause: " + e.getMessage(), e);
}
}
}