blob: db158c7133757c3f6692a02656881a45e5cef049 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.support;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
import org.apache.dubbo.common.timer.TimerTask;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_FAILBACK_TIMES;
import static org.apache.dubbo.common.constants.CommonConstants.RETRIES_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_FAILBACK_TASKS;
import static org.apache.dubbo.rpc.cluster.Constants.FAIL_BACK_TASKS_KEY;
/**
* When fails, record failure requests and schedule for retry on a regular interval.
* Especially useful for services of notification.
*
* <a href="http://en.wikipedia.org/wiki/Failback">Failback</a>
*/
public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(FailbackClusterInvoker.class);
private static final long RETRY_FAILED_PERIOD = 5;
/**
* Number of retries obtained from the configuration, don't contain the first invoke.
*/
private final int retries;
private final int failbackTasks;
private volatile Timer failTimer;
public FailbackClusterInvoker(Directory<T> directory) {
super(directory);
int retriesConfig = getUrl().getParameter(RETRIES_KEY, DEFAULT_FAILBACK_TIMES);
if (retriesConfig < 0) {
retriesConfig = DEFAULT_FAILBACK_TIMES;
}
int failbackTasksConfig = getUrl().getParameter(FAIL_BACK_TASKS_KEY, DEFAULT_FAILBACK_TASKS);
if (failbackTasksConfig <= 0) {
failbackTasksConfig = DEFAULT_FAILBACK_TASKS;
}
retries = retriesConfig;
failbackTasks = failbackTasksConfig;
}
private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, URL consumerUrl) {
if (failTimer == null) {
synchronized (this) {
if (failTimer == null) {
failTimer = new HashedWheelTimer(
new NamedThreadFactory("failback-cluster-timer", true),
1,
TimeUnit.SECONDS, 32, failbackTasks);
}
}
}
RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD, consumerUrl);
try {
failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
} catch (Throwable e) {
logger.error("2-9","add newTimeout exception","","Failback background works error, invocation->" + invocation + ", exception: " + e.getMessage(),e);
}
}
@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Invoker<T> invoker = null;
URL consumerUrl = RpcContext.getServiceContext().getConsumerUrl();
try {
checkInvokers(invokers, invocation);
invoker = select(loadbalance, invocation, invokers, null);
// Asynchronous call method must be used here, because failback will retry in the background.
// Then the serviceContext will be cleared after the call is completed.
return invokeWithContextAsync(invoker, invocation, consumerUrl);
} catch (Throwable e) {
logger.error("2-10","Failback to invoke method and start to retries","","Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ",e);
if (retries > 0) {
addFailed(loadbalance, invocation, invokers, invoker, consumerUrl);
}
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}
@Override
public void destroy() {
super.destroy();
if (failTimer != null) {
failTimer.stop();
}
}
/**
* RetryTimerTask
*/
private class RetryTimerTask implements TimerTask {
private final Invocation invocation;
private final LoadBalance loadbalance;
private final List<Invoker<T>> invokers;
private final long tick;
private Invoker<T> lastInvoker;
private URL consumerUrl;
/**
* Number of retries obtained from the configuration, don't contain the first invoke.
*/
private final int retries;
/**
* Number of retried.
*/
private int retriedTimes = 0;
RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker,
int retries, long tick, URL consumerUrl) {
this.loadbalance = loadbalance;
this.invocation = invocation;
this.invokers = invokers;
this.retries = retries;
this.tick = tick;
this.lastInvoker = lastInvoker;
this.consumerUrl = consumerUrl;
}
@Override
public void run(Timeout timeout) {
try {
logger.info("Attempt to retry to invoke method " + invocation.getMethodName() +
". The total will retry " + retries + " times, the current is the " + retriedTimes + " retry");
Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
lastInvoker = retryInvoker;
invokeWithContextAsync(retryInvoker, invocation, consumerUrl);
} catch (Throwable e) {
logger.error("2-10","Failed retry to invoke method","","Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.",e);
if ((++retriedTimes) >= retries) {
logger.error("2-10","Failed retry to invoke method and retry times exceed threshold","","Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation,e);
} else {
rePut(timeout);
}
}
}
private void rePut(Timeout timeout) {
if (timeout == null) {
return;
}
Timer timer = timeout.timer();
if (timer.isStop() || timeout.isCancelled()) {
return;
}
timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
}
}
}