blob: 8e3b682292e1e9c42b8c11b40aaea518eaeacddd [file] [log] [blame]
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.registry.support;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.utils.ConcurrentHashSet;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.registry.NotifyListener;
/**
* FailbackRegistry. (SPI, Prototype, ThreadSafe)
*
* @author william.liangf
*/
public abstract class FailbackRegistry extends AbstractRegistry {
// 定时任务执行器
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
// 失败重试定时器,定时检查是否有请求失败,如有,无限次重试
private final ScheduledFuture<?> retryFuture;
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
public FailbackRegistry(URL url) {
super(url);
int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
public void run() {
// 检测并连接注册中心
try {
retry();
} catch (Throwable t) { // 防御性容错
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
public Future<?> getRetryFuture() {
return retryFuture;
}
public Set<URL> getFailedRegistered() {
return failedRegistered;
}
public Set<URL> getFailedUnregistered() {
return failedUnregistered;
}
public Map<URL, Set<NotifyListener>> getFailedSubscribed() {
return failedSubscribed;
}
public Map<URL, Set<NotifyListener>> getFailedUnsubscribed() {
return failedUnsubscribed;
}
public Map<URL, Map<NotifyListener, List<URL>>> getFailedNotified() {
return failedNotified;
}
private void addFailedSubscribed(URL url, NotifyListener listener) {
Set<NotifyListener> listeners = failedSubscribed.get(url);
if (listeners == null) {
failedSubscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = failedSubscribed.get(url);
}
listeners.add(listener);
}
private void removeFailedSubscribed(URL url, NotifyListener listener) {
Set<NotifyListener> listeners = failedSubscribed.get(url);
if (listeners != null) {
listeners.remove(listener);
}
listeners = failedUnsubscribed.get(url);
if (listeners != null) {
listeners.remove(listener);
}
Map<NotifyListener, List<URL>> notified = failedNotified.get(url);
if (notified != null) {
notified.remove(listener);
}
}
@Override
public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 向服务器端发送注册请求
doRegister(url);
} catch (Exception t) {
// 将失败的注册请求记录到失败列表,定时重试
failedRegistered.add(url);
if (getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol())) { // 如果开启了启动时检测,则直接抛出异常
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
}
@Override
public void unregister(URL url) {
super.unregister(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 向服务器端发送取消注册请求
doUnregister(url);
} catch (Exception t) {
// 将失败的取消注册请求记录到失败列表,定时重试
failedUnregistered.add(url);
if (getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol())) { // 如果开启了启动时检测,则直接抛出异常
throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to uregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
}
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// 向服务器端发送订阅请求
doSubscribe(url, listener);
} catch (Exception t) {
// 将失败的订阅请求记录到失败列表,定时重试
addFailedSubscribed(url, listener);
List<URL> urls = getCacheUrls(url);
if (urls != null && urls.size() > 0) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
if (getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)) { // 如果开启了启动时检测,则直接抛出异常
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
}
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
super.unsubscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// 向服务器端发送取消订阅请求
doUnsubscribe(url, listener);
} catch (Exception t) {
// 将失败的取消订阅请求记录到失败列表,定时重试
Set<NotifyListener> listeners = failedUnsubscribed.get(url);
if (listeners == null) {
failedUnsubscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = failedUnsubscribed.get(url);
}
listeners.add(listener);
if (getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)) { // 如果开启了启动时检测,则直接抛出异常
throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
}
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
super.notify(url, listener, urls);
} catch (Exception t) {
// 将失败的通知请求记录到失败列表,定时重试
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
@Override
protected void recover() throws Exception {
// register
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (! recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
failedRegistered.add(url);
}
}
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (! recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
addFailedSubscribed(url, listener);
}
}
}
}
// 重试失败的动作
protected void retry() {
if (! failedRegistered.isEmpty()) {
Set<URL> failed = new HashSet<URL>(failedRegistered);
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry register " + failed);
}
try {
for (URL url : failed) {
try {
doRegister(url);
failedRegistered.remove(url);
} catch (Throwable t) { // 忽略所有异常,等待下次重试
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
} catch (Throwable t) { // 忽略所有异常,等待下次重试
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
if(! failedUnregistered.isEmpty()) {
Set<URL> failed = new HashSet<URL>(failedUnregistered);
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry unregister " + failed);
}
try {
for (URL url : failed) {
try {
doUnregister(url);
failedUnregistered.remove(url);
} catch (Throwable t) { // 忽略所有异常,等待下次重试
logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
} catch (Throwable t) { // 忽略所有异常,等待下次重试
logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
if (! failedSubscribed.isEmpty()) {
Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
if (entry.getValue() == null || entry.getValue().size() == 0) {
failed.remove(entry.getKey());
}
}
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry subscribe " + failed);
}
try {
for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
URL url = entry.getKey();
Set<NotifyListener> listeners = entry.getValue();
for (NotifyListener listener : listeners) {
try {
doSubscribe(url, listener);
listeners.remove(listener);
} catch (Throwable t) { // 忽略所有异常,等待下次重试
logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
} catch (Throwable t) { // 忽略所有异常,等待下次重试
logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
if (! failedUnsubscribed.isEmpty()) {
Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
if (entry.getValue() == null || entry.getValue().size() == 0) {
failed.remove(entry.getKey());
}
}
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry unsubscribe " + failed);
}
try {
for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
URL url = entry.getKey();
Set<NotifyListener> listeners = entry.getValue();
for (NotifyListener listener : listeners) {
try {
doUnsubscribe(url, listener);
listeners.remove(listener);
} catch (Throwable t) { // 忽略所有异常,等待下次重试
logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
} catch (Throwable t) { // 忽略所有异常,等待下次重试
logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
if (! failedNotified.isEmpty()) {
Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);
for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {
if (entry.getValue() == null || entry.getValue().size() == 0) {
failed.remove(entry.getKey());
}
}
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry notify " + failed);
}
try {
for (Map<NotifyListener, List<URL>> values : failed.values()) {
for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
try {
NotifyListener listener = entry.getKey();
List<URL> urls = entry.getValue();
listener.notify(urls);
values.remove(listener);
} catch (Throwable t) { // 忽略所有异常,等待下次重试
logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
} catch (Throwable t) { // 忽略所有异常,等待下次重试
logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
}
@Override
public void destroy() {
super.destroy();
try {
retryFuture.cancel(true);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
// ==== 模板方法 ====
protected abstract void doRegister(URL url);
protected abstract void doUnregister(URL url);
protected abstract void doSubscribe(URL url, NotifyListener listener);
protected abstract void doUnsubscribe(URL url, NotifyListener listener);
}