| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.rpc.cluster.support; |
| |
| import org.apache.dubbo.common.Version; |
| import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; |
| import org.apache.dubbo.common.logger.LoggerFactory; |
| import org.apache.dubbo.common.utils.NetUtils; |
| import org.apache.dubbo.rpc.Invocation; |
| import org.apache.dubbo.rpc.Invoker; |
| import org.apache.dubbo.rpc.Result; |
| import org.apache.dubbo.rpc.RpcContext; |
| import org.apache.dubbo.rpc.RpcException; |
| import org.apache.dubbo.rpc.cluster.Directory; |
| import org.apache.dubbo.rpc.cluster.LoadBalance; |
| import org.apache.dubbo.rpc.support.RpcUtils; |
| |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_RETRIES; |
| import static org.apache.dubbo.common.constants.CommonConstants.RETRIES_KEY; |
| import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_FAILED_MULTIPLE_RETRIES; |
| |
| /** |
| * When invoke fails, log the initial error and retry other invokers (retry n times, which means at most n different invokers will be invoked) |
| * Note that retry causes latency. |
| * <p> |
| * <a href="http://en.wikipedia.org/wiki/Failover">Failover</a> |
| * |
| */ |
| public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { |
| |
| private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(FailoverClusterInvoker.class); |
| |
| public FailoverClusterInvoker(Directory<T> directory) { |
| super(directory); |
| } |
| |
| @Override |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { |
| List<Invoker<T>> copyInvokers = invokers; |
| checkInvokers(copyInvokers, invocation); |
| String methodName = RpcUtils.getMethodName(invocation); |
| int len = calculateInvokeTimes(methodName); |
| // retry loop. |
| RpcException le = null; // last exception. |
| List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers. |
| Set<String> providers = new HashSet<String>(len); |
| for (int i = 0; i < len; i++) { |
| //Reselect before retry to avoid a change of candidate `invokers`. |
| //NOTE: if `invokers` changed, then `invoked` also lose accuracy. |
| if (i > 0) { |
| checkWhetherDestroyed(); |
| copyInvokers = list(invocation); |
| // check again |
| checkInvokers(copyInvokers, invocation); |
| } |
| Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); |
| invoked.add(invoker); |
| RpcContext.getServiceContext().setInvokers((List) invoked); |
| boolean success = false; |
| try { |
| Result result = invokeWithContext(invoker, invocation); |
| if (le != null && logger.isWarnEnabled()) { |
| logger.warn(CLUSTER_FAILED_MULTIPLE_RETRIES,"failed to retry do invoke","","Although retry the method " + methodName |
| + " in the service " + getInterface().getName() |
| + " was successful by the provider " + invoker.getUrl().getAddress() |
| + ", but there have been failed providers " + providers |
| + " (" + providers.size() + "/" + copyInvokers.size() |
| + ") from the registry " + directory.getUrl().getAddress() |
| + " on the consumer " + NetUtils.getLocalHost() |
| + " using the dubbo version " + Version.getVersion() + ". Last error is: " |
| + le.getMessage(),le); |
| } |
| success = true; |
| return result; |
| } catch (RpcException e) { |
| if (e.isBiz()) { // biz exception. |
| throw e; |
| } |
| le = e; |
| } catch (Throwable e) { |
| le = new RpcException(e.getMessage(), e); |
| } finally { |
| if (!success) { |
| providers.add(invoker.getUrl().getAddress()); |
| } |
| } |
| } |
| throw new RpcException(le.getCode(), "Failed to invoke the method " |
| + methodName + " in the service " + getInterface().getName() |
| + ". Tried " + len + " times of the providers " + providers |
| + " (" + providers.size() + "/" + copyInvokers.size() |
| + ") from the registry " + directory.getUrl().getAddress() |
| + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " |
| + Version.getVersion() + ". Last error is: " |
| + le.getMessage(), le.getCause() != null ? le.getCause() : le); |
| } |
| |
| private int calculateInvokeTimes(String methodName) { |
| int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; |
| RpcContext rpcContext = RpcContext.getClientAttachment(); |
| Object retry = rpcContext.getObjectAttachment(RETRIES_KEY); |
| if (retry instanceof Number) { |
| len = ((Number) retry).intValue() + 1; |
| rpcContext.removeAttachment(RETRIES_KEY); |
| } |
| if (len <= 0) { |
| len = 1; |
| } |
| |
| return len; |
| } |
| |
| } |