| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.registry.support; |
| |
| import org.apache.dubbo.common.URL; |
| import org.apache.dubbo.common.timer.HashedWheelTimer; |
| import org.apache.dubbo.common.utils.CollectionUtils; |
| import org.apache.dubbo.common.utils.NamedThreadFactory; |
| import org.apache.dubbo.registry.NotifyListener; |
| import org.apache.dubbo.registry.retry.FailedRegisteredTask; |
| import org.apache.dubbo.registry.retry.FailedSubscribedTask; |
| import org.apache.dubbo.registry.retry.FailedUnregisteredTask; |
| import org.apache.dubbo.registry.retry.FailedUnsubscribedTask; |
| import org.apache.dubbo.remoting.Constants; |
| |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RETRY_PERIOD; |
| import static org.apache.dubbo.registry.Constants.REGISTRY_RETRY_PERIOD_KEY; |
| |
| /** |
| * A template implementation of registry service that provides auto-retry ability. |
| * (SPI, Prototype, ThreadSafe) |
| */ |
| public abstract class FailbackRegistry extends AbstractRegistry { |
| |
| /* retry task map */ |
| |
| private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<>(); |
| |
| private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<>(); |
| |
| private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<>(); |
| |
| private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<>(); |
| |
| /** |
| * The time in milliseconds the retryExecutor will wait |
| */ |
| private final int retryPeriod; |
| |
| // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry |
| private final HashedWheelTimer retryTimer; |
| |
| public FailbackRegistry(URL url) { |
| super(url); |
| this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD); |
| |
| // since the retry task will not be very much. 128 ticks is enough. |
| retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128); |
| } |
| |
| public void removeFailedRegisteredTask(URL url) { |
| failedRegistered.remove(url); |
| } |
| |
| public void removeFailedUnregisteredTask(URL url) { |
| failedUnregistered.remove(url); |
| } |
| |
| public void removeFailedSubscribedTask(URL url, NotifyListener listener) { |
| Holder h = new Holder(url, listener); |
| failedSubscribed.remove(h); |
| } |
| |
| public void removeFailedUnsubscribedTask(URL url, NotifyListener listener) { |
| Holder h = new Holder(url, listener); |
| failedUnsubscribed.remove(h); |
| } |
| |
| private void addFailedRegistered(URL url) { |
| FailedRegisteredTask oldOne = failedRegistered.get(url); |
| if (oldOne != null) { |
| return; |
| } |
| FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); |
| oldOne = failedRegistered.putIfAbsent(url, newTask); |
| if (oldOne == null) { |
| // never has a retry task. then start a new task for retry. |
| retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| private void removeFailedRegistered(URL url) { |
| FailedRegisteredTask f = failedRegistered.remove(url); |
| if (f != null) { |
| f.cancel(); |
| } |
| } |
| |
| private void addFailedUnregistered(URL url) { |
| FailedUnregisteredTask oldOne = failedUnregistered.get(url); |
| if (oldOne != null) { |
| return; |
| } |
| FailedUnregisteredTask newTask = new FailedUnregisteredTask(url, this); |
| oldOne = failedUnregistered.putIfAbsent(url, newTask); |
| if (oldOne == null) { |
| // never has a retry task. then start a new task for retry. |
| retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| private void removeFailedUnregistered(URL url) { |
| FailedUnregisteredTask f = failedUnregistered.remove(url); |
| if (f != null) { |
| f.cancel(); |
| } |
| } |
| |
| protected void addFailedSubscribed(URL url, NotifyListener listener) { |
| Holder h = new Holder(url, listener); |
| FailedSubscribedTask oldOne = failedSubscribed.get(h); |
| if (oldOne != null) { |
| return; |
| } |
| FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener); |
| oldOne = failedSubscribed.putIfAbsent(h, newTask); |
| if (oldOne == null) { |
| // never has a retry task. then start a new task for retry. |
| retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| public void removeFailedSubscribed(URL url, NotifyListener listener) { |
| Holder h = new Holder(url, listener); |
| FailedSubscribedTask f = failedSubscribed.remove(h); |
| if (f != null) { |
| f.cancel(); |
| } |
| removeFailedUnsubscribed(url, listener); |
| } |
| |
| private void addFailedUnsubscribed(URL url, NotifyListener listener) { |
| Holder h = new Holder(url, listener); |
| FailedUnsubscribedTask oldOne = failedUnsubscribed.get(h); |
| if (oldOne != null) { |
| return; |
| } |
| FailedUnsubscribedTask newTask = new FailedUnsubscribedTask(url, this, listener); |
| oldOne = failedUnsubscribed.putIfAbsent(h, newTask); |
| if (oldOne == null) { |
| // never has a retry task. then start a new task for retry. |
| retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| private void removeFailedUnsubscribed(URL url, NotifyListener listener) { |
| Holder h = new Holder(url, listener); |
| FailedUnsubscribedTask f = failedUnsubscribed.remove(h); |
| if (f != null) { |
| f.cancel(); |
| } |
| } |
| |
| ConcurrentMap<URL, FailedRegisteredTask> getFailedRegistered() { |
| return failedRegistered; |
| } |
| |
| ConcurrentMap<URL, FailedUnregisteredTask> getFailedUnregistered() { |
| return failedUnregistered; |
| } |
| |
| ConcurrentMap<Holder, FailedSubscribedTask> getFailedSubscribed() { |
| return failedSubscribed; |
| } |
| |
| ConcurrentMap<Holder, FailedUnsubscribedTask> getFailedUnsubscribed() { |
| return failedUnsubscribed; |
| } |
| |
| |
| @Override |
| public void register(URL url) { |
| if (!acceptable(url)) { |
| logger.info("URL " + url + " will not be registered to Registry. Registry " + this.getUrl() + " does not accept service of this protocol type."); |
| return; |
| } |
| super.register(url); |
| removeFailedRegistered(url); |
| removeFailedUnregistered(url); |
| try { |
| // Sending a registration request to the server side |
| doRegister(url); |
| } catch (Exception e) { |
| Throwable t = e; |
| |
| // If the startup detection is opened, the Exception is thrown directly. |
| boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) |
| && url.getParameter(Constants.CHECK_KEY, true) |
| && (url.getPort() != 0); |
| boolean skipFailback = t instanceof SkipFailbackWrapperException; |
| if (check || skipFailback) { |
| if (skipFailback) { |
| t = t.getCause(); |
| } |
| throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); |
| } else { |
| logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); |
| } |
| |
| // Record a failed registration request to a failed list, retry regularly |
| addFailedRegistered(url); |
| } |
| } |
| |
| @Override |
| public void reExportRegister(URL url) { |
| if (!acceptable(url)) { |
| logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type."); |
| return; |
| } |
| super.register(url); |
| removeFailedRegistered(url); |
| removeFailedUnregistered(url); |
| try { |
| // Sending a registration request to the server side |
| doRegister(url); |
| } catch (Exception e) { |
| if (!(e instanceof SkipFailbackWrapperException)) { |
| throw new IllegalStateException("Failed to register (re-export) " + url + " to registry " + getUrl().getAddress() + ", cause: " + e.getMessage(), e); |
| } |
| } |
| } |
| |
| @Override |
| public void unregister(URL url) { |
| super.unregister(url); |
| removeFailedRegistered(url); |
| removeFailedUnregistered(url); |
| try { |
| // Sending a cancellation request to the server side |
| doUnregister(url); |
| } catch (Exception e) { |
| Throwable t = e; |
| |
| // If the startup detection is opened, the Exception is thrown directly. |
| boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) |
| && url.getParameter(Constants.CHECK_KEY, true) |
| && (url.getPort() != 0); |
| boolean skipFailback = t instanceof SkipFailbackWrapperException; |
| if (check || skipFailback) { |
| if (skipFailback) { |
| t = t.getCause(); |
| } |
| throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); |
| } else { |
| logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t); |
| } |
| |
| // Record a failed registration request to a failed list, retry regularly |
| addFailedUnregistered(url); |
| } |
| } |
| |
| @Override |
| public void reExportUnregister(URL url) { |
| super.unregister(url); |
| removeFailedRegistered(url); |
| removeFailedUnregistered(url); |
| try { |
| // Sending a cancellation request to the server side |
| doUnregister(url); |
| } catch (Exception e) { |
| if (!(e instanceof SkipFailbackWrapperException)) { |
| throw new IllegalStateException("Failed to unregister(re-export) " + url + " to registry " + getUrl().getAddress() + ", cause: " + e.getMessage(), e); |
| } |
| } |
| } |
| |
| @Override |
| public void subscribe(URL url, NotifyListener listener) { |
| super.subscribe(url, listener); |
| removeFailedSubscribed(url, listener); |
| try { |
| // Sending a subscription request to the server side |
| doSubscribe(url, listener); |
| } catch (Exception e) { |
| Throwable t = e; |
| |
| List<URL> urls = getCacheUrls(url); |
| if (CollectionUtils.isNotEmpty(urls)) { |
| notify(url, listener, urls); |
| logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getCacheFile().getName() + ", cause: " + t.getMessage(), t); |
| } else { |
| // If the startup detection is opened, the Exception is thrown directly. |
| boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) |
| && url.getParameter(Constants.CHECK_KEY, true); |
| boolean skipFailback = t instanceof SkipFailbackWrapperException; |
| if (check || skipFailback) { |
| if (skipFailback) { |
| t = t.getCause(); |
| } |
| throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); |
| } else { |
| logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); |
| } |
| } |
| |
| // Record a failed registration request to a failed list, retry regularly |
| addFailedSubscribed(url, listener); |
| } |
| } |
| |
| @Override |
| public void unsubscribe(URL url, NotifyListener listener) { |
| super.unsubscribe(url, listener); |
| removeFailedSubscribed(url, listener); |
| try { |
| // Sending a canceling subscription request to the server side |
| doUnsubscribe(url, listener); |
| } catch (Exception e) { |
| Throwable t = e; |
| |
| // If the startup detection is opened, the Exception is thrown directly. |
| boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) |
| && url.getParameter(Constants.CHECK_KEY, true); |
| boolean skipFailback = t instanceof SkipFailbackWrapperException; |
| if (check || skipFailback) { |
| if (skipFailback) { |
| t = t.getCause(); |
| } |
| throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); |
| } else { |
| logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); |
| } |
| |
| // Record a failed registration request to a failed list, retry regularly |
| addFailedUnsubscribed(url, listener); |
| } |
| } |
| |
| @Override |
| protected void notify(URL url, NotifyListener listener, List<URL> urls) { |
| if (url == null) { |
| throw new IllegalArgumentException("notify url == null"); |
| } |
| if (listener == null) { |
| throw new IllegalArgumentException("notify listener == null"); |
| } |
| try { |
| doNotify(url, listener, urls); |
| } catch (Exception t) { |
| // Record a failed registration request to a failed list |
| logger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t); |
| } |
| } |
| |
| protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { |
| super.notify(url, listener, urls); |
| } |
| |
| @Override |
| protected void recover() throws Exception { |
| // register |
| Set<URL> recoverRegistered = new HashSet<>(getRegistered()); |
| if (!recoverRegistered.isEmpty()) { |
| if (logger.isInfoEnabled()) { |
| logger.info("Recover register url " + recoverRegistered); |
| } |
| for (URL url : recoverRegistered) { |
| // remove fail registry or unRegistry task first. |
| removeFailedRegistered(url); |
| removeFailedUnregistered(url); |
| addFailedRegistered(url); |
| } |
| } |
| // subscribe |
| Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<>(getSubscribed()); |
| if (!recoverSubscribed.isEmpty()) { |
| if (logger.isInfoEnabled()) { |
| logger.info("Recover subscribe url " + recoverSubscribed.keySet()); |
| } |
| for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) { |
| URL url = entry.getKey(); |
| for (NotifyListener listener : entry.getValue()) { |
| // First remove other tasks to ensure that addFailedSubscribed can succeed. |
| removeFailedSubscribed(url, listener); |
| addFailedSubscribed(url, listener); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void destroy() { |
| super.destroy(); |
| retryTimer.stop(); |
| } |
| |
| // ==== Template method ==== |
| |
| public abstract void doRegister(URL url); |
| |
| public abstract void doUnregister(URL url); |
| |
| public abstract void doSubscribe(URL url, NotifyListener listener); |
| |
| public abstract void doUnsubscribe(URL url, NotifyListener listener); |
| |
| static class Holder { |
| |
| private final URL url; |
| |
| private final NotifyListener notifyListener; |
| |
| Holder(URL url, NotifyListener notifyListener) { |
| if (url == null || notifyListener == null) { |
| throw new IllegalArgumentException(); |
| } |
| this.url = url; |
| this.notifyListener = notifyListener; |
| } |
| |
| @Override |
| public int hashCode() { |
| return url.hashCode() + notifyListener.hashCode(); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj instanceof Holder) { |
| Holder h = (Holder) obj; |
| return this.url.equals(h.url) && this.notifyListener.equals(h.notifyListener); |
| } else { |
| return false; |
| } |
| } |
| } |
| } |