blob: 6763543c7c0d30511e8de504320ce3220d435dea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.multicast;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;
import org.apache.dubbo.rpc.model.ApplicationModel;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_SOCKET_EXCEPTION;
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.OVERRIDE_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTE_PROTOCOL;
import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.DEFAULT_SESSION_TIMEOUT;
import static org.apache.dubbo.registry.Constants.REGISTER;
import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
import static org.apache.dubbo.registry.Constants.SESSION_TIMEOUT_KEY;
import static org.apache.dubbo.registry.Constants.SUBSCRIBE;
import static org.apache.dubbo.registry.Constants.UNREGISTER;
import static org.apache.dubbo.registry.Constants.UNSUBSCRIBE;
/**
* MulticastRegistry
*/
public class MulticastRegistry extends FailbackRegistry {
// logging output
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(MulticastRegistry.class);
private static final int DEFAULT_MULTICAST_PORT = 1234;
private final InetAddress multicastAddress;
private final MulticastSocket multicastSocket;
private final int multicastPort;
private final ConcurrentMap<URL, Set<URL>> received = new ConcurrentHashMap<>();
private final ScheduledExecutorService cleanExecutor =
Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboMulticastRegistryCleanTimer", true));
private final ScheduledFuture<?> cleanFuture;
private final int cleanPeriod;
private volatile boolean admin = false;
public MulticastRegistry(URL url, ApplicationModel applicationModel) {
this(url);
}
public MulticastRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
try {
multicastAddress = InetAddress.getByName(url.getHost());
checkMulticastAddress(multicastAddress);
multicastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
multicastSocket = new MulticastSocket(multicastPort);
NetUtils.joinMulticastGroup(multicastSocket, multicastAddress);
Thread thread = new Thread(
() -> {
byte[] buf = new byte[2048];
DatagramPacket recv = new DatagramPacket(buf, buf.length);
while (!multicastSocket.isClosed()) {
try {
multicastSocket.receive(recv);
String msg = new String(recv.getData()).trim();
int i = msg.indexOf('\n');
if (i > 0) {
msg = msg.substring(0, i).trim();
}
MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
Arrays.fill(buf, (byte) 0);
} catch (Throwable e) {
if (!multicastSocket.isClosed()) {
logger.error(REGISTRY_SOCKET_EXCEPTION, "", "", e.getMessage(), e);
}
}
}
},
"DubboMulticastRegistryReceiver");
thread.setDaemon(true);
thread.start();
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
this.cleanPeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT);
if (url.getParameter("clean", true)) {
this.cleanFuture = cleanExecutor.scheduleWithFixedDelay(
() -> {
try {
clean(); // Remove the expired
} catch (Throwable t) { // Defensive fault tolerance
logger.error(
REGISTRY_SOCKET_EXCEPTION,
"",
"",
"Unexpected exception occur at clean expired provider, cause: " + t.getMessage(),
t);
}
},
cleanPeriod,
cleanPeriod,
TimeUnit.MILLISECONDS);
} else {
this.cleanFuture = null;
}
}
private void checkMulticastAddress(InetAddress multicastAddress) {
if (!multicastAddress.isMulticastAddress()) {
String message = "Invalid multicast address " + multicastAddress;
if (multicastAddress instanceof Inet4Address) {
throw new IllegalArgumentException(
message + ", " + "ipv4 multicast address scope: 224.0.0.0 - 239.255.255.255.");
} else {
throw new IllegalArgumentException(
message + ", " + "ipv6 multicast address must start with ff, " + "for example: ff01::1");
}
}
}
/**
* Remove the expired providers, only when "clean" parameter is true.
*/
private void clean() {
if (admin) {
for (Set<URL> providers : new HashSet<Set<URL>>(received.values())) {
for (URL url : new HashSet<URL>(providers)) {
if (isExpired(url)) {
if (logger.isWarnEnabled()) {
logger.warn(REGISTRY_SOCKET_EXCEPTION, "", "", "Clean expired provider " + url);
}
doUnregister(url);
}
}
}
}
}
private boolean isExpired(URL url) {
if (!url.getParameter(DYNAMIC_KEY, true)
|| url.getPort() <= 0
|| CONSUMER_PROTOCOL.equals(url.getProtocol())
|| ROUTE_PROTOCOL.equals(url.getProtocol())
|| OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
return false;
}
try (Socket socket = new Socket(url.getHost(), url.getPort())) {
} catch (Throwable e) {
try {
Thread.sleep(100);
} catch (Throwable e2) {
}
try (Socket socket2 = new Socket(url.getHost(), url.getPort())) {
} catch (Throwable e2) {
return true;
}
}
return false;
}
private void receive(String msg, InetSocketAddress remoteAddress) {
if (logger.isInfoEnabled()) {
logger.info("Receive multicast message: " + msg + " from " + remoteAddress);
}
if (msg.startsWith(REGISTER)) {
URL url = URL.valueOf(msg.substring(REGISTER.length()).trim());
registered(url);
} else if (msg.startsWith(UNREGISTER)) {
URL url = URL.valueOf(msg.substring(UNREGISTER.length()).trim());
unregistered(url);
} else if (msg.startsWith(SUBSCRIBE)) {
URL url = URL.valueOf(msg.substring(SUBSCRIBE.length()).trim());
Set<URL> urls = getRegistered();
if (CollectionUtils.isNotEmpty(urls)) {
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String host = remoteAddress != null && remoteAddress.getAddress() != null
? remoteAddress.getAddress().getHostAddress()
: url.getIp();
if (url.getParameter("unicast", true) // Whether the consumer's machine has only one process
&& !NetUtils.getLocalHost()
.equals(host)) { // Multiple processes in the same machine cannot be unicast
// with unicast or there will be only one process receiving
// information
unicast(REGISTER + " " + u.toFullString(), host);
} else {
multicast(REGISTER + " " + u.toFullString());
}
}
}
}
} /* else if (msg.startsWith(UNSUBSCRIBE)) {
}*/
}
private void multicast(String msg) {
if (logger.isInfoEnabled()) {
logger.info("Send multicast message: " + msg + " to " + multicastAddress + ":" + multicastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
DatagramPacket hi = new DatagramPacket(data, data.length, multicastAddress, multicastPort);
multicastSocket.send(hi);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
private void unicast(String msg, String host) {
if (logger.isInfoEnabled()) {
logger.info("Send unicast message: " + msg + " to " + host + ":" + multicastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), multicastPort);
multicastSocket.send(hi);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public void doRegister(URL url) {
multicast(REGISTER + " " + url.toFullString());
}
@Override
public void doUnregister(URL url) {
multicast(UNREGISTER + " " + url.toFullString());
}
@Override
public void doSubscribe(URL url, final NotifyListener listener) {
if (ANY_VALUE.equals(url.getServiceInterface())) {
admin = true;
}
multicast(SUBSCRIBE + " " + url.toFullString());
synchronized (listener) {
try {
listener.wait(url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT));
} catch (InterruptedException e) {
}
}
}
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
unregister(url);
}
multicast(UNSUBSCRIBE + " " + url.toFullString());
}
@Override
public boolean isAvailable() {
try {
return multicastSocket != null;
} catch (Throwable t) {
return false;
}
}
/**
* Remove the expired providers(if clean is true), leave the multicast group and close the multicast socket.
*/
@Override
public void destroy() {
super.destroy();
try {
ExecutorUtil.cancelScheduledFuture(cleanFuture);
} catch (Throwable t) {
logger.warn(REGISTRY_SOCKET_EXCEPTION, "", "", t.getMessage(), t);
}
try {
multicastSocket.leaveGroup(multicastAddress);
multicastSocket.close();
} catch (Throwable t) {
logger.warn(REGISTRY_SOCKET_EXCEPTION, "", "", t.getMessage(), t);
}
ExecutorUtil.gracefulShutdown(cleanExecutor, cleanPeriod);
}
protected void registered(URL url) {
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL key = entry.getKey();
if (UrlUtils.isMatch(key, url)) {
Set<URL> urls = received.computeIfAbsent(key, k -> new ConcurrentHashSet<>());
urls.add(url);
List<URL> list = toList(urls);
for (final NotifyListener listener : entry.getValue()) {
notify(key, listener, list);
synchronized (listener) {
listener.notify();
}
}
}
}
}
protected void unregistered(URL url) {
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL key = entry.getKey();
if (UrlUtils.isMatch(key, url)) {
Set<URL> urls = received.get(key);
if (urls != null) {
urls.remove(url);
}
if (urls == null || urls.isEmpty()) {
if (urls == null) {
urls = new ConcurrentHashSet<>();
}
URL empty = url.setProtocol(EMPTY_PROTOCOL);
urls.add(empty);
}
List<URL> list = toList(urls);
for (NotifyListener listener : entry.getValue()) {
notify(key, listener, list);
}
}
}
}
protected void subscribed(URL url, NotifyListener listener) {
List<URL> urls = lookup(url);
notify(url, listener, urls);
}
private List<URL> toList(Set<URL> urls) {
List<URL> list = new ArrayList<>();
if (CollectionUtils.isNotEmpty(urls)) {
list.addAll(urls);
}
return list;
}
@Override
public void register(URL url) {
super.register(url);
registered(url);
}
@Override
public void unregister(URL url) {
super.unregister(url);
unregistered(url);
}
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
subscribed(url, listener);
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
super.unsubscribe(url, listener);
received.remove(url);
}
@Override
public List<URL> lookup(URL url) {
List<URL> urls = new ArrayList<>();
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> values : notifiedUrls.values()) {
urls.addAll(values);
}
}
if (urls.isEmpty()) {
List<URL> cacheUrls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(cacheUrls)) {
urls.addAll(cacheUrls);
}
}
if (urls.isEmpty()) {
for (URL u : getRegistered()) {
if (UrlUtils.isMatch(url, u)) {
urls.add(u);
}
}
}
if (ANY_VALUE.equals(url.getServiceInterface())) {
for (URL u : getSubscribed().keySet()) {
if (UrlUtils.isMatch(url, u)) {
urls.add(u);
}
}
}
return urls;
}
public MulticastSocket getMulticastSocket() {
return multicastSocket;
}
public Map<URL, Set<URL>> getReceived() {
return received;
}
}