blob: 251d35f50e1f566b2e0cab7896444afa563db00b [file] [log] [blame]
/*
* Copyright 1999-2012 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.registry.redis;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.pool.impl.GenericObjectPool;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.common.utils.UrlUtils;
import com.alibaba.dubbo.registry.NotifyListener;
import com.alibaba.dubbo.registry.support.FailbackRegistry;
/**
* RedisRegistry
*
* @author william.liangf
*/
public class RedisRegistry extends FailbackRegistry {
private static final Logger logger = LoggerFactory.getLogger(RedisRegistry.class);
private static final int DEFAULT_REDIS_PORT = 6379;
private final static String DEFAULT_ROOT = "dubbo";
private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryExpireTimer", true));
private final ScheduledFuture<?> expireFuture;
private final String root;
private final JedisPool jedisPool;
private final NotifySub sub = new NotifySub();
private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<String, Notifier>();
private final int reconnectPeriod;
private final int expirePeriod;
private volatile boolean admin = false;
public RedisRegistry(URL url) {
super(url);
GenericObjectPool.Config config = new GenericObjectPool.Config();
config.testOnBorrow = url.getParameter("test.on.borrow", true);
config.testOnReturn = url.getParameter("test.on.return", false);
config.testWhileIdle = url.getParameter("test.while.idle", false);
if (url.getParameter("max.idle", 0) > 0)
config.maxIdle = url.getParameter("max.idle", 0);
if (url.getParameter("min.idle", 0) > 0)
config.minIdle = url.getParameter("min.idle", 0);
if (url.getParameter("max.active", 0) > 0)
config.maxActive = url.getParameter("max.active", 0);
if (url.getParameter("max.wait", 0) > 0)
config.maxWait = url.getParameter("max.wait", 0);
if (url.getParameter("num.tests.per.eviction.run", 0) > 0)
config.numTestsPerEvictionRun = url.getParameter("num.tests.per.eviction.run", 0);
if (url.getParameter("time.between.eviction.runs.millis", 0) > 0)
config.timeBetweenEvictionRunsMillis = url.getParameter("time.between.eviction.runs.millis", 0);
if (url.getParameter("min.evictable.idle.time.millis", 0) > 0)
config.minEvictableIdleTimeMillis = url.getParameter("min.evictable.idle.time.millis", 0);
this.jedisPool = new JedisPool(config, url.getHost(),
url.getPort() == 0 ? DEFAULT_REDIS_PORT : url.getPort(),
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (! group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
if (! group.endsWith(Constants.PATH_SEPARATOR)) {
group = group + Constants.PATH_SEPARATOR;
}
this.root = group;
this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
this.expireFuture = expireExecutor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
deferExpired(); // 延长过期时间
} catch (Throwable t) { // 防御性容错
logger.error("Unexpected error occur at defer expire time, cause: " + t.getMessage(), t);
}
}
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}
private void deferExpired() {
Jedis jedis = jedisPool.getResource();
try {
for (String provider : new HashSet<String>(getRegistered())) {
String key = toProviderPath(URL.valueOf(provider));
if (jedis.hset(key, provider, String.valueOf(System.currentTimeMillis() + expirePeriod)) == 0) {
jedis.publish(key, Constants.REGISTER);
}
}
for (String consumer : new HashSet<String>(getSubscribed().keySet())) {
URL url = URL.valueOf(consumer);
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String key = toConsumerPath(url);
if (jedis.hset(key, consumer, String.valueOf(System.currentTimeMillis() + expirePeriod)) == 0) {
jedis.publish(key, Constants.SUBSCRIBE);
}
}
}
if (admin) {
clean(jedis);
}
} finally {
jedisPool.returnResource(jedis);
}
}
// 监控中心负责删除过期脏数据
private void clean(Jedis jedis) {
Set<String> keys = jedis.keys(root + Constants.ANY_VALUE);
if (keys != null && keys.size() > 0) {
for (String key : keys) {
Map<String, String> values = jedis.hgetAll(key);
if (values != null && values.size() > 0) {
boolean delete = false;
long now = System.currentTimeMillis();
for (Map.Entry<String, String> entry : values.entrySet()) {
long expire = Long.parseLong(entry.getValue());
if (expire < now) {
jedis.hdel(key, entry.getKey());
delete = true;
if (logger.isWarnEnabled()) {
logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
}
}
}
if (delete) {
if (key.endsWith(Constants.CONSUMERS)) {
jedis.publish(key, Constants.UNSUBSCRIBE);
} else {
jedis.publish(key, Constants.UNREGISTER);
}
}
}
}
}
}
public boolean isAvailable() {
try {
Jedis jedis = jedisPool.getResource();
try {
return jedis.isConnected();
} finally {
jedisPool.returnResource(jedis);
}
} catch (Throwable t) {
return false;
}
}
@Override
public void destroy() {
super.destroy();
try {
expireFuture.cancel(true);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
for (Notifier notifier : notifiers.values()) {
notifier.shutdown();
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
jedisPool.destroy();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
@Override
public void doRegister(URL url) {
String key = toProviderPath(url);
String value = url.toFullString();
String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
Jedis jedis = jedisPool.getResource();
try {
jedis.hset(key, value, expire);
jedis.publish(key, Constants.REGISTER);
} finally {
jedisPool.returnResource(jedis);
}
}
@Override
public void doUnregister(URL url) {
String key = toProviderPath(url);
String value = url.toFullString();
Jedis jedis = jedisPool.getResource();
try {
jedis.hdel(key, value);
jedis.publish(key, Constants.UNREGISTER);
} finally {
jedisPool.returnResource(jedis);
}
}
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
String service = toServicePath(url);
Notifier notifier = notifiers.get(service);
if (notifier == null) {
Notifier newNotifier = new Notifier(service);
notifiers.putIfAbsent(service, newNotifier);
notifier = notifiers.get(service);
if (notifier == newNotifier) {
notifier.start();
}
}
Jedis jedis = jedisPool.getResource();
try {
if (service.endsWith(Constants.ANY_VALUE)) {
admin = true;
for (String s : getServices(jedis, service)) {
doNotify(jedis, s, url, listener);
}
} else {
String key = toConsumerPath(url);
String value = url.toFullString();
String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
jedis.hset(key, value, expire);
jedis.publish(key, Constants.SUBSCRIBE);
doNotify(jedis, service, url, listener);
}
} finally {
jedisPool.returnResource(jedis);
}
}
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String key = toConsumerPath(url);
String value = url.toFullString();
Jedis jedis = jedisPool.getResource();
try {
jedis.hdel(key, value);
jedis.publish(key, Constants.UNSUBSCRIBE);
} finally {
jedisPool.returnResource(jedis);
}
}
}
private void doNotify(Jedis jedis, String service, boolean consumer) {
for (Map.Entry<String, Set<NotifyListener>> entry : new HashMap<String, Set<NotifyListener>>(getSubscribed()).entrySet()) {
URL url = URL.valueOf(entry.getKey());
if (Constants.ANY_VALUE.equals(url.getServiceInterface())
|| (! consumer && toServicePath(url).equals(service))) {
doNotify(jedis, service, url, new HashSet<NotifyListener>(entry.getValue()));
}
}
}
private void doNotify(Jedis jedis, String service, URL url, NotifyListener listener) {
doNotify(jedis, service, url, Arrays.asList(listener));
}
private void doNotify(Jedis jedis, String service, URL url, Collection<NotifyListener> listeners) {
Map<String, String> providers;
providers = jedis.hgetAll(service + Constants.PATH_SEPARATOR + Constants.PROVIDERS);
if (url.getParameter(Constants.ADMIN_KEY, false)) {
Map<String, String> consumers = jedis.hgetAll(service + Constants.PATH_SEPARATOR + Constants.CONSUMERS);
if (consumers != null && consumers.size() > 0) {
providers = providers == null ? new HashMap<String, String>() : new HashMap<String, String>(providers);
providers.putAll(consumers);
}
}
if (logger.isInfoEnabled()) {
logger.info("redis notify: " + service + " = " + providers);
}
List<URL> urls = new ArrayList<URL>();
if (providers != null && providers.size() > 0) {
long now = System.currentTimeMillis();
for (Map.Entry<String, String> entry : providers.entrySet()) {
URL u = URL.valueOf(entry.getKey());
if (Long.parseLong(entry.getValue()) >= now) {
if (UrlUtils.isMatch(url, u)) {
urls.add(u);
}
}
}
}
if (urls != null && urls.isEmpty() && url.getParameter(Constants.ADMIN_KEY, false)) {
URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL);
if (Constants.ANY_VALUE.equals(empty.getServiceInterface())) {
empty = empty.setServiceInterface(service.substring(root.length()));
}
urls.add(empty);
}
for (NotifyListener listener : listeners) {
notify(url, listener, urls);
}
}
private String getService(Jedis jedis, String key) {
int i = key.indexOf(Constants.PATH_SEPARATOR, root.length());
return i > 0 ? key.substring(0, i) : key;
}
private Set<String> getServices(Jedis jedis, String pattern) {
Set<String> keys = jedis.keys(pattern);
Set<String> services = new HashSet<String>();
if (keys != null && keys.size() > 0) {
for (String key : keys) {
services.add(getService(jedis, key));
}
}
return services;
}
private String toServicePath(URL url) {
return root + url.getServiceInterface();
}
private String toProviderPath(URL url) {
return toServicePath(url) + Constants.PATH_SEPARATOR + Constants.PROVIDERS;
}
private String toConsumerPath(URL url) {
return toServicePath(url) + Constants.PATH_SEPARATOR + Constants.CONSUMERS;
}
private class NotifySub extends JedisPubSub {
@Override
public void onMessage(String key, String msg) {
if (logger.isInfoEnabled()) {
logger.info("redis event: " + key + " = " + msg);
}
if (msg.equals(Constants.REGISTER)
|| msg.equals(Constants.UNREGISTER)
|| msg.equals(Constants.SUBSCRIBE)
|| msg.equals(Constants.UNSUBSCRIBE)) {
Jedis jedis = jedisPool.getResource();
try {
doNotify(jedis, getService(jedis, key), msg.equals(Constants.SUBSCRIBE) || msg.equals(Constants.UNSUBSCRIBE));
} finally {
jedisPool.returnResource(jedis);
}
}
}
@Override
public void onPMessage(String pattern, String key, String msg) {
onMessage(key, msg);
}
@Override
public void onSubscribe(String key, int num) {
}
@Override
public void onPSubscribe(String pattern, int num) {
}
@Override
public void onUnsubscribe(String key, int num) {
}
@Override
public void onPUnsubscribe(String pattern, int num) {
}
}
private class Notifier extends Thread {
private final String service;
private volatile Jedis jedis;
private volatile boolean first = true;
private volatile boolean running = true;
private final AtomicInteger connectSkip = new AtomicInteger();
private final AtomicInteger connectSkiped = new AtomicInteger();
private final Random random = new Random();
private volatile int connectRandom;
private void resetSkip() {
connectSkip.set(0);
connectSkiped.set(0);
connectRandom = 0;
}
private boolean isSkip() {
int skip = connectSkip.get(); // 跳过次数增长
if (skip >= 10) { // 如果跳过次数增长超过10,取随机数
if (connectRandom == 0) {
connectRandom = random.nextInt(10);
}
skip = 10 + connectRandom;
}
if (connectSkiped.getAndIncrement() < skip) { // 检查跳过次数
return true;
}
connectSkip.incrementAndGet();
connectSkiped.set(0);
connectRandom = 0;
return false;
}
public Notifier(String service) {
super.setDaemon(true);
super.setName("DubboRedisSubscribe");
this.service = service;
}
@Override
public void run() {
while (running) {
try {
if (! isSkip()) {
try {
jedis = jedisPool.getResource();
try {
if (service.endsWith(Constants.ANY_VALUE)) {
if (! first) {
first = false;
for (String s : getServices(jedis, service)) {
doNotify(jedis, s, false);
}
resetSkip();
}
jedis.psubscribe(sub, service);
} else {
if (! first) {
first = false;
doNotify(jedis, service, false);
resetSkip();
}
jedis.subscribe(sub, service + Constants.PATH_SEPARATOR + Constants.PROVIDERS);
}
} finally {
jedisPool.returnBrokenResource(jedis);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
public void shutdown() {
try {
running = false;
jedis.disconnect();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
}