blob: c0072a7b6ab38e6424db89598259ff228d03d549 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.redis;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.RemotingConstants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.CacheableFailbackRegistry;
import org.apache.dubbo.rpc.RpcException;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.JedisSentinelPool;
import redis.clients.util.Pool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD;
import static org.apache.dubbo.registry.Constants.DEFAULT_SESSION_TIMEOUT;
import static org.apache.dubbo.registry.Constants.REGISTER;
import static org.apache.dubbo.registry.Constants.REGISTRY_RECONNECT_PERIOD_KEY;
import static org.apache.dubbo.registry.Constants.SESSION_TIMEOUT_KEY;
import static org.apache.dubbo.registry.Constants.UNREGISTER;
/**
* RedisRegistry
*/
public class RedisRegistry extends CacheableFailbackRegistry {
private static final Logger logger = LoggerFactory.getLogger(RedisRegistry.class);
private static final int DEFAULT_REDIS_PORT = 6379;
private final static String DEFAULT_ROOT = "dubbo";
private static final String REDIS_MASTER_NAME_KEY = "master-name";
private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryExpireTimer", true));
private final ScheduledFuture<?> expireFuture;
private final String root;
private final Map<String, Pool<Jedis>> jedisPools = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<>();
private final int reconnectPeriod;
private final int expirePeriod;
private volatile boolean admin = false;
private boolean replicate;
public RedisRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
config.setTestOnReturn(url.getParameter("test.on.return", false));
config.setTestWhileIdle(url.getParameter("test.while.idle", false));
if (url.getParameter("max.idle", 0) > 0) {
config.setMaxIdle(url.getParameter("max.idle", 0));
}
if (url.getParameter("min.idle", 0) > 0) {
config.setMinIdle(url.getParameter("min.idle", 0));
}
if (url.getParameter("max.active", 0) > 0) {
config.setMaxTotal(url.getParameter("max.active", 0));
}
if (url.getParameter("max.total", 0) > 0) {
config.setMaxTotal(url.getParameter("max.total", 0));
}
if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) {
config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0)));
}
if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
}
if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
}
if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
}
String cluster = url.getParameter("cluster", "failover");
if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
}
replicate = "replicate".equals(cluster);
List<String> addresses = new ArrayList<>();
addresses.add(url.getAddress());
String[] backups = url.getParameter(RemotingConstants.BACKUP_KEY, new String[0]);
if (ArrayUtils.isNotEmpty(backups)) {
addresses.addAll(Arrays.asList(backups));
}
//获得Redis主节点名称
String masterName = url.getParameter(REDIS_MASTER_NAME_KEY);
if (StringUtils.isEmpty(masterName)) {
//单机版redis
for (String address : addresses) {
int i = address.indexOf(':');
String host;
int port;
if (i > 0) {
host = address.substring(0, i);
port = Integer.parseInt(address.substring(i + 1));
} else {
host = address;
port = DEFAULT_REDIS_PORT;
}
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
url.getParameter("db.index", 0)));
}
} else {
//哨兵版redis
Set<String> sentinelSet = new HashSet<>(addresses);
int index = url.getParameter("db.index", 0);
int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
String password = StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword();
JedisSentinelPool pool = new JedisSentinelPool(masterName, sentinelSet, config, timeout, password, index);
this.jedisPools.put(masterName, pool);
}
this.reconnectPeriod = url.getParameter(REGISTRY_RECONNECT_PERIOD_KEY, DEFAULT_REGISTRY_RECONNECT_PERIOD);
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
if (!group.endsWith(PATH_SEPARATOR)) {
group = group + PATH_SEPARATOR;
}
this.root = group;
this.expirePeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT);
this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
try {
deferExpired(); // Extend the expiration time
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
}
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}
private void deferExpired() {
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(DYNAMIC_KEY, true)) {
String key = toCategoryPath(url);
if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
jedis.publish(key, REGISTER);
}
}
}
if (admin) {
clean(jedis);
}
if (!replicate) {
break;//  If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
}
}
}
// The monitoring center is responsible for deleting outdated dirty data
private void clean(Jedis jedis) {
Set<String> keys = jedis.keys(root + ANY_VALUE);
if (CollectionUtils.isNotEmpty(keys)) {
for (String key : keys) {
Map<String, String> values = jedis.hgetAll(key);
if (CollectionUtils.isNotEmptyMap(values)) {
boolean delete = false;
long now = System.currentTimeMillis();
for (Map.Entry<String, String> entry : values.entrySet()) {
URL url = URL.valueOf(entry.getKey());
if (url.getParameter(DYNAMIC_KEY, true)) {
long expire = Long.parseLong(entry.getValue());
if (expire < now) {
jedis.hdel(key, entry.getKey());
delete = true;
if (logger.isWarnEnabled()) {
logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
}
}
}
}
if (delete) {
jedis.publish(key, UNREGISTER);
}
}
}
}
}
@Override
public boolean isAvailable() {
for (Pool<Jedis> jedisPool : jedisPools.values()) {
try (Jedis jedis = jedisPool.getResource()) {
if (jedis.isConnected()) {
return true; // At least one single machine is available.
}
} catch (Throwable t) {
}
}
return false;
}
@Override
public void destroy() {
super.destroy();
try {
expireFuture.cancel(true);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
for (Notifier notifier : notifiers.values()) {
notifier.shutdown();
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
jedisPool.destroy();
} catch (Throwable t) {
logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
}
}
ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod);
}
@Override
public void doRegister(URL url) {
String key = toCategoryPath(url);
String value = url.toFullString();
String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
boolean success = false;
RpcException exception = null;
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
jedis.hset(key, value, expire);
jedis.publish(key, REGISTER);
success = true;
if (!replicate) {
break; //  If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}
@Override
public void doUnregister(URL url) {
String key = toCategoryPath(url);
String value = url.toFullString();
RpcException exception = null;
boolean success = false;
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
jedis.hdel(key, value);
jedis.publish(key, UNREGISTER);
success = true;
if (!replicate) {
break; //  If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
String service = toServicePath(url);
Notifier notifier = notifiers.get(service);
if (notifier == null) {
Notifier newNotifier = new Notifier(service);
notifiers.putIfAbsent(service, newNotifier);
notifier = notifiers.get(service);
if (notifier == newNotifier) {
notifier.start();
}
}
boolean success = false;
RpcException exception = null;
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
if (service.endsWith(ANY_VALUE)) {
admin = true;
Set<String> keys = jedis.keys(service);
if (CollectionUtils.isNotEmpty(keys)) {
Map<String, Set<String>> serviceKeys = new HashMap<>();
for (String key : keys) {
String serviceKey = toServicePath(key);
Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
sk.add(key);
}
for (Set<String> sk : serviceKeys.values()) {
doNotify(jedis, sk, url, Collections.singletonList(listener));
}
}
} else {
doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
}
success = true;
break; // Just read one server's data
}
} catch (Throwable t) { // Try the next server
exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
}
private void doNotify(Jedis jedis, String key) {
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
}
}
private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
if (keys == null || keys.isEmpty()
|| listeners == null || listeners.isEmpty()) {
return;
}
long now = System.currentTimeMillis();
List<URL> result = new ArrayList<>();
List<String> categories = Arrays.asList(url.getParameter(CATEGORY_KEY, new String[0]));
String consumerService = url.getServiceInterface();
for (String key : keys) {
if (!ANY_VALUE.equals(consumerService)) {
String providerService = toServiceName(key);
if (!providerService.equals(consumerService)) {
continue;
}
}
String category = toCategoryName(key);
if (!categories.contains(ANY_VALUE) && !categories.contains(category)) {
continue;
}
Map<String, String> values = jedis.hgetAll(key);
List<String> rawUrls = new ArrayList<>(values.size());
if (CollectionUtils.isNotEmptyMap(values)) {
for (Map.Entry<String, String> entry : values.entrySet()) {
if (Long.parseLong(entry.getValue()) >= now) {
rawUrls.add(entry.getKey());
}
}
}
List<URL> urls = toUrlsWithEmpty(url, category, rawUrls);
result.addAll(urls);
if (logger.isInfoEnabled()) {
logger.info("redis notify: " + key + " = " + urls);
}
}
if (CollectionUtils.isEmpty(result)) {
return;
}
for (NotifyListener listener : listeners) {
notify(url, listener, result);
}
}
@Override
protected boolean isMatch(URL subscribeUrl, URL providerUrl) {
return !providerUrl.getParameter(DYNAMIC_KEY, true) && UrlUtils.isMatch(subscribeUrl, providerUrl);
}
private String toServiceName(String categoryPath) {
String servicePath = toServicePath(categoryPath);
return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
}
private String toCategoryName(String categoryPath) {
int i = categoryPath.lastIndexOf(PATH_SEPARATOR);
return i > 0 ? categoryPath.substring(i + 1) : categoryPath;
}
private String toServicePath(String categoryPath) {
int i;
if (categoryPath.startsWith(root)) {
i = categoryPath.indexOf(PATH_SEPARATOR, root.length());
} else {
i = categoryPath.indexOf(PATH_SEPARATOR);
}
return i > 0 ? categoryPath.substring(0, i) : categoryPath;
}
private String toServicePath(URL url) {
return root + url.getServiceInterface();
}
private String toCategoryPath(URL url) {
return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
}
private class NotifySub extends JedisPubSub {
private final Pool<Jedis> jedisPool;
public NotifySub(Pool<Jedis> jedisPool) {
this.jedisPool = jedisPool;
}
@Override
public void onMessage(String key, String msg) {
if (logger.isInfoEnabled()) {
logger.info("redis event: " + key + " = " + msg);
}
if (msg.equals(REGISTER)
|| msg.equals(UNREGISTER)) {
try {
Jedis jedis = jedisPool.getResource();
try {
doNotify(jedis, key);
} finally {
jedis.close();
}
} catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
logger.error(t.getMessage(), t);
}
}
}
@Override
public void onPMessage(String pattern, String key, String msg) {
onMessage(key, msg);
}
@Override
public void onSubscribe(String key, int num) {
}
@Override
public void onPSubscribe(String pattern, int num) {
}
@Override
public void onUnsubscribe(String key, int num) {
}
@Override
public void onPUnsubscribe(String pattern, int num) {
}
}
private class Notifier extends Thread {
private final String service;
private final AtomicInteger connectSkip = new AtomicInteger();
private final AtomicInteger connectSkipped = new AtomicInteger();
private volatile Jedis jedis;
private volatile boolean first = true;
private volatile boolean running = true;
private volatile int connectRandom;
public Notifier(String service) {
super.setDaemon(true);
super.setName("DubboRedisSubscribe");
this.service = service;
}
private void resetSkip() {
connectSkip.set(0);
connectSkipped.set(0);
connectRandom = 0;
}
private boolean isSkip() {
int skip = connectSkip.get(); // Growth of skipping times
if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number
if (connectRandom == 0) {
connectRandom = ThreadLocalRandom.current().nextInt(10);
}
skip = 10 + connectRandom;
}
if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
return true;
}
connectSkip.incrementAndGet();
connectSkipped.set(0);
connectRandom = 0;
return false;
}
@Override
public void run() {
while (running) {
try {
if (!isSkip()) {
try {
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
if (jedisPool.isClosed()) {
continue;
}
jedis = jedisPool.getResource();
if (!jedis.isConnected()) {
continue;
}
try {
if (service.endsWith(ANY_VALUE)) {
if (first) {
first = false;
Set<String> keys = jedis.keys(service);
if (CollectionUtils.isNotEmpty(keys)) {
for (String s : keys) {
doNotify(jedis, s);
}
}
resetSkip();
}
jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
} else {
if (first) {
first = false;
doNotify(jedis, service);
resetSkip();
}
jedis.psubscribe(new NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE); // blocking
}
break;
} finally {
jedis.close();
}
} catch (Throwable t) { // Retry another server
logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
// If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
public void shutdown() {
try {
running = false;
jedis.disconnect();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
}