blob: f920755ec1bc436ab2f600c365a3d9b19a717f62 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.support;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.retry.FailedRegisteredTask;
import org.apache.dubbo.registry.retry.FailedSubscribedTask;
import org.apache.dubbo.registry.retry.FailedUnregisteredTask;
import org.apache.dubbo.registry.retry.FailedUnsubscribedTask;
import org.apache.dubbo.remoting.Constants;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.FILE_KEY;
import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RETRY_PERIOD;
import static org.apache.dubbo.registry.Constants.REGISTRY_RETRY_PERIOD_KEY;
/**
* FailbackRegistry. (SPI, Prototype, ThreadSafe)
*/
public abstract class FailbackRegistry extends AbstractRegistry {
/* retry task map */
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
/**
* The time in milliseconds the retryExecutor will wait
*/
private final int retryPeriod;
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
private final HashedWheelTimer retryTimer;
public FailbackRegistry(URL url) {
super(url);
this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
// since the retry task will not be very much. 128 ticks is enough.
retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}
public void removeFailedRegisteredTask(URL url) {
failedRegistered.remove(url);
}
public void removeFailedUnregisteredTask(URL url) {
failedUnregistered.remove(url);
}
public void removeFailedSubscribedTask(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener);
failedSubscribed.remove(h);
}
public void removeFailedUnsubscribedTask(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener);
failedUnsubscribed.remove(h);
}
private void addFailedRegistered(URL url) {
FailedRegisteredTask oldOne = failedRegistered.get(url);
if (oldOne != null) {
return;
}
FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
oldOne = failedRegistered.putIfAbsent(url, newTask);
if (oldOne == null) {
// never has a retry task. then start a new task for retry.
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
}
}
private void removeFailedRegistered(URL url) {
FailedRegisteredTask f = failedRegistered.remove(url);
if (f != null) {
f.cancel();
}
}
private void addFailedUnregistered(URL url) {
FailedUnregisteredTask oldOne = failedUnregistered.get(url);
if (oldOne != null) {
return;
}
FailedUnregisteredTask newTask = new FailedUnregisteredTask(url, this);
oldOne = failedUnregistered.putIfAbsent(url, newTask);
if (oldOne == null) {
// never has a retry task. then start a new task for retry.
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
}
}
private void removeFailedUnregistered(URL url) {
FailedUnregisteredTask f = failedUnregistered.remove(url);
if (f != null) {
f.cancel();
}
}
protected void addFailedSubscribed(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener);
FailedSubscribedTask oldOne = failedSubscribed.get(h);
if (oldOne != null) {
return;
}
FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener);
oldOne = failedSubscribed.putIfAbsent(h, newTask);
if (oldOne == null) {
// never has a retry task. then start a new task for retry.
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
}
}
public void removeFailedSubscribed(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener);
FailedSubscribedTask f = failedSubscribed.remove(h);
if (f != null) {
f.cancel();
}
removeFailedUnsubscribed(url, listener);
}
private void addFailedUnsubscribed(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener);
FailedUnsubscribedTask oldOne = failedUnsubscribed.get(h);
if (oldOne != null) {
return;
}
FailedUnsubscribedTask newTask = new FailedUnsubscribedTask(url, this, listener);
oldOne = failedUnsubscribed.putIfAbsent(h, newTask);
if (oldOne == null) {
// never has a retry task. then start a new task for retry.
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
}
}
private void removeFailedUnsubscribed(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener);
FailedUnsubscribedTask f = failedUnsubscribed.remove(h);
if (f != null) {
f.cancel();
}
}
ConcurrentMap<URL, FailedRegisteredTask> getFailedRegistered() {
return failedRegistered;
}
ConcurrentMap<URL, FailedUnregisteredTask> getFailedUnregistered() {
return failedUnregistered;
}
ConcurrentMap<Holder, FailedSubscribedTask> getFailedSubscribed() {
return failedSubscribed;
}
ConcurrentMap<Holder, FailedUnsubscribedTask> getFailedUnsubscribed() {
return failedUnsubscribed;
}
@Override
public void register(URL url) {
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
return;
}
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
@Override
public void reExportRegister(URL url) {
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
return;
}
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
if (!(e instanceof SkipFailbackWrapperException)) {
throw new IllegalStateException("Failed to register (re-export) " + url + " to registry " + getUrl().getAddress() + ", cause: " + e.getMessage(), e);
}
}
}
@Override
public void unregister(URL url) {
super.unregister(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a cancellation request to the server side
doUnregister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedUnregistered(url);
}
}
@Override
public void reExportUnregister(URL url) {
super.unregister(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a cancellation request to the server side
doUnregister(url);
} catch (Exception e) {
if (!(e instanceof SkipFailbackWrapperException)) {
throw new IllegalStateException("Failed to unregister(re-export) " + url + " to registry " + getUrl().getAddress() + ", cause: " + e.getMessage(), e);
}
}
}
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(urls)) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
super.unsubscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a canceling subscription request to the server side
doUnsubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedUnsubscribed(url, listener);
}
}
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list
logger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
super.notify(url, listener, urls);
}
@Override
protected void recover() throws Exception {
// register
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
// remove fail registry or unRegistry task first.
removeFailedRegistered(url);
removeFailedUnregistered(url);
addFailedRegistered(url);
}
}
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
// First remove other tasks to ensure that addFailedSubscribed can succeed.
removeFailedSubscribed(url, listener);
addFailedSubscribed(url, listener);
}
}
}
}
@Override
public void destroy() {
super.destroy();
retryTimer.stop();
}
// ==== Template method ====
public abstract void doRegister(URL url);
public abstract void doUnregister(URL url);
public abstract void doSubscribe(URL url, NotifyListener listener);
public abstract void doUnsubscribe(URL url, NotifyListener listener);
static class Holder {
private final URL url;
private final NotifyListener notifyListener;
Holder(URL url, NotifyListener notifyListener) {
if (url == null || notifyListener == null) {
throw new IllegalArgumentException();
}
this.url = url;
this.notifyListener = notifyListener;
}
@Override
public int hashCode() {
return url.hashCode() + notifyListener.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Holder) {
Holder h = (Holder) obj;
return this.url.equals(h.url) && this.notifyListener.equals(h.notifyListener);
} else {
return false;
}
}
}
}