blob: cf27f30ddd84b1bcc6c6e09db3f9546514f3970b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package failback
import (
import (
import (
* When fails, record failure requests and schedule for retry on a regular interval.
* Especially useful for services of notification.
* <a href="">Failback</a>
type failbackClusterInvoker struct {
once sync.Once
ticker *time.Ticker
maxRetries int64
failbackTasks int64
taskList *queue.Queue
func newFailbackClusterInvoker(directory directory.Directory) protocol.Invoker {
invoker := &failbackClusterInvoker{
BaseClusterInvoker: base.NewBaseClusterInvoker(directory),
retriesConfig := invoker.GetURL().GetParam(constant.RetriesKey, constant.DefaultFailbackTimes)
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.")
retries = constant.DefaultFailbackTimesInt
failbackTasksConfig := invoker.GetURL().GetParamInt(constant.FailBackTasksKey, constant.DefaultFailbackTasks)
if failbackTasksConfig <= 0 {
failbackTasksConfig = constant.DefaultFailbackTasks
invoker.maxRetries = int64(retries)
invoker.failbackTasks = failbackTasksConfig
return invoker
func (invoker *failbackClusterInvoker) tryTimerTaskProc(ctx context.Context, retryTask *retryTimerTask) {
invoked := make([]protocol.Invoker, 0)
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.DoSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
result := retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
retryTask.lastErr = result.Error()
func (invoker *failbackClusterInvoker) process(ctx context.Context) {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
// check each timeout task and re-run
for {
value, err := invoker.taskList.Peek()
if err == queue.ErrDisposed {
if err == queue.ErrEmptyQueue {
retryTask := value.(*retryTimerTask)
if time.Since(retryTask.lastT).Seconds() < 5 {
// ignore return. the get must success.
if _, err = invoker.taskList.Get(1); err != nil {
logger.Warnf("get task found err: %v\n", err)
go invoker.tryTimerTaskProc(ctx, retryTask)
// nolint
func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.Directory.List(invocation)
if err := invoker.CheckInvokers(invokers, invocation); err != nil {
logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
invocation.MethodName(), invoker.GetURL().Service(), err)
return &protocol.RPCResult{}
// Get the service loadbalance config
url := invokers[0].GetURL()
lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)
// Get the service method loadbalance config if have
methodName := invocation.MethodName()
if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); v != "" {
lb = v
loadBalance := extension.GetLoadbalance(lb)
invoked := make([]protocol.Invoker, 0, len(invokers))
ivk := invoker.DoSelect(loadBalance, invocation, invokers, invoked)
result := ivk.Invoke(ctx, invocation)
if result.Error() != nil {
invoker.once.Do(func() {
invoker.taskList = queue.New(invoker.failbackTasks)
go invoker.process(ctx)
taskLen := invoker.taskList.Len()
if taskLen >= invoker.failbackTasks {
logger.Warnf("tasklist is too full > %d.\n", taskLen)
return &protocol.RPCResult{}
timerTask := newRetryTimerTask(loadBalance, invocation, invokers, ivk, invoker)
logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
methodName, url.Service(), result.Error().Error())
// ignore
return &protocol.RPCResult{}
return result
func (invoker *failbackClusterInvoker) Destroy() {
// stop ticker
if invoker.ticker != nil {
_ = invoker.taskList.Dispose()
type retryTimerTask struct {
loadbalance loadbalance.LoadBalance
invocation protocol.Invocation
invokers []protocol.Invoker
lastInvoker protocol.Invoker
retries int64
maxRetries int64
lastT time.Time
clusterInvoker *failbackClusterInvoker
lastErr error
func (t *retryTimerTask) checkRetry() {
logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n",
t.invocation.MethodName(), t.clusterInvoker.GetURL().Service(), t.lastErr)
t.lastT = time.Now()
if t.retries > t.maxRetries {
logger.Errorf("Retry times exceed threshold (%v), invocation-> %v.\n",
t.retries, t.invocation)
if err := t.clusterInvoker.taskList.Put(t); err != nil {
logger.Errorf("invoker.taskList.Put(retryTask:%#v) = error:%v", t, err)
func newRetryTimerTask(loadbalance loadbalance.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
lastInvoker protocol.Invoker, cInvoker *failbackClusterInvoker) *retryTimerTask {
task := &retryTimerTask{
loadbalance: loadbalance,
invocation: invocation,
invokers: invokers,
lastInvoker: lastInvoker,
lastT: time.Now(),
clusterInvoker: cInvoker,
if retries, ok := invocation.GetAttachment(constant.RetriesKey); ok {
rInt, _ := strconv.Atoi(retries)
task.maxRetries = int64(rInt)
} else {
task.maxRetries = cInvoker.maxRetries
return task