/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.dubbo.registry.support; | |
import org.apache.dubbo.common.URL; | |
import org.apache.dubbo.common.timer.HashedWheelTimer; | |
import org.apache.dubbo.common.utils.CollectionUtils; | |
import org.apache.dubbo.common.utils.NamedThreadFactory; | |
import org.apache.dubbo.registry.NotifyListener; | |
import org.apache.dubbo.registry.retry.FailedRegisteredTask; | |
import org.apache.dubbo.registry.retry.FailedSubscribedTask; | |
import org.apache.dubbo.registry.retry.FailedUnregisteredTask; | |
import org.apache.dubbo.registry.retry.FailedUnsubscribedTask; | |
import org.apache.dubbo.remoting.Constants; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.TimeUnit; | |
import static org.apache.dubbo.common.constants.CommonConstants.FILE_KEY; | |
import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL; | |
import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RETRY_PERIOD; | |
import static org.apache.dubbo.registry.Constants.REGISTRY_RETRY_PERIOD_KEY; | |
/** | |
* FailbackRegistry. (SPI, Prototype, ThreadSafe) | |
*/ | |
public abstract class FailbackRegistry extends AbstractRegistry { | |
/* retry task map */ | |
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>(); | |
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>(); | |
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>(); | |
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>(); | |
/** | |
* The time in milliseconds the retryExecutor will wait | |
*/ | |
private final int retryPeriod; | |
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry | |
private final HashedWheelTimer retryTimer; | |
public FailbackRegistry(URL url) { | |
super(url); | |
this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD); | |
// since the retry task will not be very much. 128 ticks is enough. | |
retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128); | |
} | |
public void removeFailedRegisteredTask(URL url) { | |
failedRegistered.remove(url); | |
} | |
public void removeFailedUnregisteredTask(URL url) { | |
failedUnregistered.remove(url); | |
} | |
public void removeFailedSubscribedTask(URL url, NotifyListener listener) { | |
Holder h = new Holder(url, listener); | |
failedSubscribed.remove(h); | |
} | |
public void removeFailedUnsubscribedTask(URL url, NotifyListener listener) { | |
Holder h = new Holder(url, listener); | |
failedUnsubscribed.remove(h); | |
} | |
private void addFailedRegistered(URL url) { | |
FailedRegisteredTask oldOne = failedRegistered.get(url); | |
if (oldOne != null) { | |
return; | |
} | |
FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); | |
oldOne = failedRegistered.putIfAbsent(url, newTask); | |
if (oldOne == null) { | |
// never has a retry task. then start a new task for retry. | |
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); | |
} | |
} | |
private void removeFailedRegistered(URL url) { | |
FailedRegisteredTask f = failedRegistered.remove(url); | |
if (f != null) { | |
f.cancel(); | |
} | |
} | |
private void addFailedUnregistered(URL url) { | |
FailedUnregisteredTask oldOne = failedUnregistered.get(url); | |
if (oldOne != null) { | |
return; | |
} | |
FailedUnregisteredTask newTask = new FailedUnregisteredTask(url, this); | |
oldOne = failedUnregistered.putIfAbsent(url, newTask); | |
if (oldOne == null) { | |
// never has a retry task. then start a new task for retry. | |
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); | |
} | |
} | |
private void removeFailedUnregistered(URL url) { | |
FailedUnregisteredTask f = failedUnregistered.remove(url); | |
if (f != null) { | |
f.cancel(); | |
} | |
} | |
protected void addFailedSubscribed(URL url, NotifyListener listener) { | |
Holder h = new Holder(url, listener); | |
FailedSubscribedTask oldOne = failedSubscribed.get(h); | |
if (oldOne != null) { | |
return; | |
} | |
FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener); | |
oldOne = failedSubscribed.putIfAbsent(h, newTask); | |
if (oldOne == null) { | |
// never has a retry task. then start a new task for retry. | |
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); | |
} | |
} | |
public void removeFailedSubscribed(URL url, NotifyListener listener) { | |
Holder h = new Holder(url, listener); | |
FailedSubscribedTask f = failedSubscribed.remove(h); | |
if (f != null) { | |
f.cancel(); | |
} | |
removeFailedUnsubscribed(url, listener); | |
} | |
private void addFailedUnsubscribed(URL url, NotifyListener listener) { | |
Holder h = new Holder(url, listener); | |
FailedUnsubscribedTask oldOne = failedUnsubscribed.get(h); | |
if (oldOne != null) { | |
return; | |
} | |
FailedUnsubscribedTask newTask = new FailedUnsubscribedTask(url, this, listener); | |
oldOne = failedUnsubscribed.putIfAbsent(h, newTask); | |
if (oldOne == null) { | |
// never has a retry task. then start a new task for retry. | |
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); | |
} | |
} | |
private void removeFailedUnsubscribed(URL url, NotifyListener listener) { | |
Holder h = new Holder(url, listener); | |
FailedUnsubscribedTask f = failedUnsubscribed.remove(h); | |
if (f != null) { | |
f.cancel(); | |
} | |
} | |
ConcurrentMap<URL, FailedRegisteredTask> getFailedRegistered() { | |
return failedRegistered; | |
} | |
ConcurrentMap<URL, FailedUnregisteredTask> getFailedUnregistered() { | |
return failedUnregistered; | |
} | |
ConcurrentMap<Holder, FailedSubscribedTask> getFailedSubscribed() { | |
return failedSubscribed; | |
} | |
ConcurrentMap<Holder, FailedUnsubscribedTask> getFailedUnsubscribed() { | |
return failedUnsubscribed; | |
} | |
@Override | |
public void register(URL url) { | |
if (!acceptable(url)) { | |
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type."); | |
return; | |
} | |
super.register(url); | |
removeFailedRegistered(url); | |
removeFailedUnregistered(url); | |
try { | |
// Sending a registration request to the server side | |
doRegister(url); | |
} catch (Exception e) { | |
Throwable t = e; | |
// If the startup detection is opened, the Exception is thrown directly. | |
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) | |
&& url.getParameter(Constants.CHECK_KEY, true) | |
&& !CONSUMER_PROTOCOL.equals(url.getProtocol()); | |
boolean skipFailback = t instanceof SkipFailbackWrapperException; | |
if (check || skipFailback) { | |
if (skipFailback) { | |
t = t.getCause(); | |
} | |
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); | |
} else { | |
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); | |
} | |
// Record a failed registration request to a failed list, retry regularly | |
addFailedRegistered(url); | |
} | |
} | |
@Override | |
public void reExportRegister(URL url) { | |
if (!acceptable(url)) { | |
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type."); | |
return; | |
} | |
super.register(url); | |
removeFailedRegistered(url); | |
removeFailedUnregistered(url); | |
try { | |
// Sending a registration request to the server side | |
doRegister(url); | |
} catch (Exception e) { | |
if (!(e instanceof SkipFailbackWrapperException)) { | |
throw new IllegalStateException("Failed to register (re-export) " + url + " to registry " + getUrl().getAddress() + ", cause: " + e.getMessage(), e); | |
} | |
} | |
} | |
@Override | |
public void unregister(URL url) { | |
super.unregister(url); | |
removeFailedRegistered(url); | |
removeFailedUnregistered(url); | |
try { | |
// Sending a cancellation request to the server side | |
doUnregister(url); | |
} catch (Exception e) { | |
Throwable t = e; | |
// If the startup detection is opened, the Exception is thrown directly. | |
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) | |
&& url.getParameter(Constants.CHECK_KEY, true) | |
&& !CONSUMER_PROTOCOL.equals(url.getProtocol()); | |
boolean skipFailback = t instanceof SkipFailbackWrapperException; | |
if (check || skipFailback) { | |
if (skipFailback) { | |
t = t.getCause(); | |
} | |
throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); | |
} else { | |
logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t); | |
} | |
// Record a failed registration request to a failed list, retry regularly | |
addFailedUnregistered(url); | |
} | |
} | |
@Override | |
public void reExportUnregister(URL url) { | |
super.unregister(url); | |
removeFailedRegistered(url); | |
removeFailedUnregistered(url); | |
try { | |
// Sending a cancellation request to the server side | |
doUnregister(url); | |
} catch (Exception e) { | |
if (!(e instanceof SkipFailbackWrapperException)) { | |
throw new IllegalStateException("Failed to unregister(re-export) " + url + " to registry " + getUrl().getAddress() + ", cause: " + e.getMessage(), e); | |
} | |
} | |
} | |
@Override | |
public void subscribe(URL url, NotifyListener listener) { | |
super.subscribe(url, listener); | |
removeFailedSubscribed(url, listener); | |
try { | |
// Sending a subscription request to the server side | |
doSubscribe(url, listener); | |
} catch (Exception e) { | |
Throwable t = e; | |
List<URL> urls = getCacheUrls(url); | |
if (CollectionUtils.isNotEmpty(urls)) { | |
notify(url, listener, urls); | |
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); | |
} else { | |
// If the startup detection is opened, the Exception is thrown directly. | |
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) | |
&& url.getParameter(Constants.CHECK_KEY, true); | |
boolean skipFailback = t instanceof SkipFailbackWrapperException; | |
if (check || skipFailback) { | |
if (skipFailback) { | |
t = t.getCause(); | |
} | |
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); | |
} else { | |
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); | |
} | |
} | |
// Record a failed registration request to a failed list, retry regularly | |
addFailedSubscribed(url, listener); | |
} | |
} | |
@Override | |
public void unsubscribe(URL url, NotifyListener listener) { | |
super.unsubscribe(url, listener); | |
removeFailedSubscribed(url, listener); | |
try { | |
// Sending a canceling subscription request to the server side | |
doUnsubscribe(url, listener); | |
} catch (Exception e) { | |
Throwable t = e; | |
// If the startup detection is opened, the Exception is thrown directly. | |
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) | |
&& url.getParameter(Constants.CHECK_KEY, true); | |
boolean skipFailback = t instanceof SkipFailbackWrapperException; | |
if (check || skipFailback) { | |
if (skipFailback) { | |
t = t.getCause(); | |
} | |
throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); | |
} else { | |
logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); | |
} | |
// Record a failed registration request to a failed list, retry regularly | |
addFailedUnsubscribed(url, listener); | |
} | |
} | |
@Override | |
protected void notify(URL url, NotifyListener listener, List<URL> urls) { | |
if (url == null) { | |
throw new IllegalArgumentException("notify url == null"); | |
} | |
if (listener == null) { | |
throw new IllegalArgumentException("notify listener == null"); | |
} | |
try { | |
doNotify(url, listener, urls); | |
} catch (Exception t) { | |
// Record a failed registration request to a failed list | |
logger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t); | |
} | |
} | |
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { | |
super.notify(url, listener, urls); | |
} | |
@Override | |
protected void recover() throws Exception { | |
// register | |
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered()); | |
if (!recoverRegistered.isEmpty()) { | |
if (logger.isInfoEnabled()) { | |
logger.info("Recover register url " + recoverRegistered); | |
} | |
for (URL url : recoverRegistered) { | |
// remove fail registry or unRegistry task first. | |
removeFailedRegistered(url); | |
removeFailedUnregistered(url); | |
addFailedRegistered(url); | |
} | |
} | |
// subscribe | |
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed()); | |
if (!recoverSubscribed.isEmpty()) { | |
if (logger.isInfoEnabled()) { | |
logger.info("Recover subscribe url " + recoverSubscribed.keySet()); | |
} | |
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { | |
URL url = entry.getKey(); | |
for (NotifyListener listener : entry.getValue()) { | |
// First remove other tasks to ensure that addFailedSubscribed can succeed. | |
removeFailedSubscribed(url, listener); | |
addFailedSubscribed(url, listener); | |
} | |
} | |
} | |
} | |
@Override | |
public void destroy() { | |
super.destroy(); | |
retryTimer.stop(); | |
} | |
// ==== Template method ==== | |
public abstract void doRegister(URL url); | |
public abstract void doUnregister(URL url); | |
public abstract void doSubscribe(URL url, NotifyListener listener); | |
public abstract void doUnsubscribe(URL url, NotifyListener listener); | |
static class Holder { | |
private final URL url; | |
private final NotifyListener notifyListener; | |
Holder(URL url, NotifyListener notifyListener) { | |
if (url == null || notifyListener == null) { | |
throw new IllegalArgumentException(); | |
} | |
this.url = url; | |
this.notifyListener = notifyListener; | |
} | |
@Override | |
public int hashCode() { | |
return url.hashCode() + notifyListener.hashCode(); | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (obj instanceof Holder) { | |
Holder h = (Holder) obj; | |
return this.url.equals(h.url) && this.notifyListener.equals(h.notifyListener); | |
} else { | |
return false; | |
} | |
} | |
} | |
} |