/* | |
* Copyright 1999-2011 Alibaba Group. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.alibaba.dubbo.rpc.cluster.support; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.ScheduledFuture; | |
import java.util.concurrent.TimeUnit; | |
import com.alibaba.dubbo.common.logger.Logger; | |
import com.alibaba.dubbo.common.logger.LoggerFactory; | |
import com.alibaba.dubbo.common.utils.NamedThreadFactory; | |
import com.alibaba.dubbo.rpc.Invocation; | |
import com.alibaba.dubbo.rpc.Invoker; | |
import com.alibaba.dubbo.rpc.Result; | |
import com.alibaba.dubbo.rpc.RpcException; | |
import com.alibaba.dubbo.rpc.RpcResult; | |
import com.alibaba.dubbo.rpc.cluster.Directory; | |
import com.alibaba.dubbo.rpc.cluster.LoadBalance; | |
/** | |
* 失败自动恢复,后台记录失败请求,定时重发,通常用于消息通知操作。 | |
* | |
* @author tony.chenl | |
*/ | |
public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> { | |
private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class); | |
private static final long RETRY_FAILED_PERIOD = 5 * 1000; | |
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true)); | |
private volatile ScheduledFuture<?> retryFuture; | |
private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>(); | |
public FailbackClusterInvoker(Directory<T> directory){ | |
super(directory); | |
} | |
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) { | |
if (retryFuture == null) { | |
synchronized (this) { | |
if (retryFuture == null) { | |
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { | |
public void run() { | |
// 收集统计信息 | |
try { | |
retryFailed(); | |
} catch (Throwable t) { // 防御性容错 | |
logger.error("Unexpected error occur at collect statistic", t); | |
} | |
} | |
}, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); | |
} | |
} | |
} | |
failed.put(invocation, router); | |
} | |
void retryFailed() { | |
if (failed.size() == 0) { | |
return; | |
} | |
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>( | |
failed).entrySet()) { | |
Invocation invocation = entry.getKey(); | |
Invoker<?> invoker = entry.getValue(); | |
try { | |
invoker.invoke(invocation); | |
failed.remove(invocation); | |
} catch (Throwable e) { | |
logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); | |
} | |
} | |
} | |
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { | |
try { | |
checkInvokers(invokers, invocation); | |
Invoker<T> invoker = select(loadbalance, invocation, invokers, null); | |
return invoker.invoke(invocation); | |
} catch (Throwable e) { | |
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " | |
+ e.getMessage() + ", ", e); | |
addFailed(invocation, this); | |
return new RpcResult(); // ignore | |
} | |
} | |
} |