blob: e3d45c9b1e83f09aa1178b01e46cf320045fb049 [file] [log] [blame]
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.registry.zookeeper;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.ConcurrentHashSet;
import com.alibaba.dubbo.common.utils.UrlUtils;
import com.alibaba.dubbo.registry.NotifyListener;
import com.alibaba.dubbo.registry.support.FailbackRegistry;
import com.alibaba.dubbo.rpc.RpcException;
/**
* ZookeeperRegistry
*
* @author william.liangf
*/
public class ZookeeperRegistry extends FailbackRegistry {
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
private final static String DEFAULT_ROOT = "dubbo";
private final String root;
private final boolean auth;
private final List<ACL> acl;
private final ReentrantLock zookeeperLock = new ReentrantLock();
private final Set<String> failedWatched = new ConcurrentHashSet<String>();
private final Set<String> anyServices = new ConcurrentHashSet<String>();
private final ConcurrentMap<String, Set<NotifyListener>> anyNotifyListeners = new ConcurrentHashMap<String, Set<NotifyListener>>();
private volatile ZooKeeper zookeeper;
public ZookeeperRegistry(URL url) {
super(url);
this.auth = url.getUsername() != null && url.getUsername().length() > 0
&& url.getPassword() != null && url.getPassword().length() > 0;
this.acl = auth ? Ids.CREATOR_ALL_ACL : Ids.OPEN_ACL_UNSAFE;
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (! group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
initZookeeper();
}
@Override
protected void doRetry() {
initZookeeper();
if (failedWatched.size() > 0) {
Set<String> failed = new HashSet<String>(failedWatched);
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry watch " + failed + " to zookeeper " + getUrl());
}
for (String service : failed) {
try {
getChildren(service);
failedWatched.remove(service);
} catch (Throwable t) {
logger.warn("Failed to retry register " + failed + " to zookeeper " + getUrl() + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
}
}
private List<String> watch(String service) {
try {
ZooKeeper zk = ZookeeperRegistry.this.zookeeper;
if (zk != null) {
List<String> result = getChildren(service);
failedWatched.remove(service);
return result;
}
} catch (Throwable e) {
logger.warn("Failed to watch path " + service + " to zookeeper" + getUrl() + ", cause: " + e.getMessage(), e);
}
failedWatched.add(service);
return new ArrayList<String>(0);
}
private void initZookeeper() {
ZooKeeper zk = this.zookeeper;
if (zk == null || zk.getState() == null || ! zk.getState().isAlive()) {
zookeeperLock.lock();
try {
zk = this.zookeeper;
if (zk == null || zk.getState() == null || ! zk.getState().isAlive()) {
this.zookeeper = createZookeeper();
recover();
}
if (zk != null) {
zk.close();
}
} catch (Exception e) {
throw new IllegalStateException("Can not connect to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
} finally {
zookeeperLock.unlock();
}
}
}
static String appendDefaultPort(String address) {
if (address != null && address.length() > 0) {
int i = address.indexOf(':');
if (i < 0) {
return address + ":" + DEFAULT_ZOOKEEPER_PORT;
} else if (Integer.parseInt(address.substring(i + 1)) == 0) {
return address.substring(0, i + 1) + DEFAULT_ZOOKEEPER_PORT;
}
}
return address;
}
private ZooKeeper createZookeeper() throws Exception {
URL url = getUrl();
StringBuilder address = new StringBuilder(appendDefaultPort(url.getAddress()));
String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
if (backups != null && backups.length > 0) {
for (String backup : backups) {
address.append(",");
address.append(appendDefaultPort(backup));
}
}
ZooKeeper zk = new ZooKeeper(address.toString(), url.getPositiveParameter(
Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT), new Watcher() {
public void process(WatchedEvent event) {
try {
if (event.getState() == KeeperState.Expired) {
initZookeeper();
} else if (event.getState() == KeeperState.SyncConnected
&& event.getType() == EventType.None) {
recover();
}
if (event.getType() != EventType.NodeChildrenChanged) {
return;
}
String path = event.getPath();
if (path == null || path.length() == 0) {
return;
}
List<String> children = watch(path);
if (path.equals(toRootPath())) {
List<String> services = children;
if (services != null && services.size() > 0) {
for (String service : services) {
if (anyServices.contains(service)) {
continue;
}
anyServices.add(service);
for (Map.Entry<String, Set<NotifyListener>> entry : anyNotifyListeners.entrySet()) {
URL subscribeUrl = URL.valueOf(entry.getKey()).setPath(service).addParameters(
Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false),
Constants.REGISTER_KEY, String.valueOf(false));
for (NotifyListener listener : entry.getValue()) {
subscribe(subscribeUrl, listener);
}
}
}
}
} else {
String dir = toRootDir();
String action = Constants.PROVIDERS;
String service = path;
if (service.startsWith(dir)) {
service = service.substring(dir.length());
}
int i = service.indexOf(Constants.PATH_SEPARATOR);
if (i >= 0) {
action = service.substring(i + 1);
service = service.substring(0, i);
}
service = URL.decode(service);
List<String> adminChildren = null;
for (Map.Entry<String, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
String key = entry.getKey();
URL subscribe = URL.valueOf(key);
List<String> notifies = children;
if (subscribe.getParameter(Constants.ADMIN_KEY, false)) {
if (adminChildren == null) {
adminChildren = getChildren(path.substring(0, path.lastIndexOf(Constants.PATH_SEPARATOR) + 1) + (Constants.CONSUMERS.equals(action) ? Constants.PROVIDERS : Constants.CONSUMERS));
adminChildren.addAll(children);
}
notifies = adminChildren;
} else if (Constants.CONSUMERS.equals(action)) {
continue;
}
String subscribeService = subscribe.getServiceInterface();
if (service.equals(subscribeService)) {
List<URL> list = toUrls(subscribe, notifies);
if (logger.isInfoEnabled()) {
logger.info("Zookeeper service changed, service: " + service + ", urls: " + list + ", zookeeper: " + getUrl());
}
for (NotifyListener listener : entry.getValue()) {
ZookeeperRegistry.this.notify(subscribe, listener, list);
}
}
}
}
} catch (Throwable e) {
logger.error("Failed to received event path " + event.getPath() + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
});
if (auth) {
zk.addAuthInfo(url.getUsername(), url.getPassword().getBytes());
}
return zk;
}
public boolean isAvailable() {
return zookeeper.getState().isAlive();
}
public void destroy() {
super.destroy();
try {
zookeeper.close();
} catch (Exception e) {
logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
private boolean exists(String node) {
try {
return zookeeper.exists(node, false) != null;
} catch (Throwable e) {
return false;
}
}
protected void doRegister(URL url) {
try {
String root = toRootPath();
if (root != null && root.length() > 0 && ! Constants.PATH_SEPARATOR.equals(root)
&& ! exists(root)) {
try {
zookeeper.create(root, new byte[0], acl, CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
}
}
String service = toServicePath(url);
if (! exists(service)) {
try {
zookeeper.create(service, new byte[0], acl, CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
}
}
String category = toCategoryPath(url);
if (! exists(category)) {
try {
zookeeper.create(category, new byte[0], acl, CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
}
}
String provider = toProviderPath(url);
if (exists(provider)) {
try {
zookeeper.delete(provider, -1);
} catch (NoNodeException e) {
}
}
CreateMode createMode = Constants.ROUTE_PROTOCOL.equals(url.getProtocol()) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
try {
zookeeper.create(provider, new byte[0], acl, createMode);
} catch (NodeExistsException e) {
try {
zookeeper.delete(provider, -1);
} catch (NoNodeException e2) {
}
zookeeper.create(provider, new byte[0], acl, createMode);
}
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
protected void doUnregister(URL url) {
try {
String provider = toProviderPath(url);
zookeeper.delete(provider, -1);
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
protected void doSubscribe(URL url, NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String key = url.toFullString();
Set<NotifyListener> listeners = anyNotifyListeners.get(key);
if (listeners == null) {
anyNotifyListeners.putIfAbsent(key, new ConcurrentHashSet<NotifyListener>());
listeners = anyNotifyListeners.get(key);
}
listeners.add(listener);
String root = toRootPath();
List<String> services = getChildren(root);
if (services != null && services.size() > 0) {
anyServices.addAll(services);
for (String service : services) {
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false), Constants.REGISTER_KEY, String.valueOf(false)), listener);
}
}
} else {
if (url.getParameter(Constants.REGISTER_KEY, true)) {
register(url, null);
}
String register = toRegisterPath(url);
List<String> providers = getChildren(register);
if (url.getParameter(Constants.ADMIN_KEY, false)) {
String subscribe = toSubscribePath(url);
List<String> consumers = getChildren(subscribe);
providers.addAll(consumers);
}
List<URL> urls = toUrls(url, providers);
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
private List<String> getChildren(String service) throws KeeperException, InterruptedException {
try {
List<String> list = zookeeper.getChildren(service, true);
if (list == null || list.size() == 0) {
return new ArrayList<String>(0);
}
List<String> result = new ArrayList<String>();
for (String value : list) {
result.add(URL.decode(value));
}
return result;
} catch (KeeperException e) {
if (e instanceof KeeperException.NoNodeException) {
return new ArrayList<String>(0);
}
throw e;
}
}
protected void doUnsubscribe(URL url, NotifyListener listener) {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String key = url.toFullString();
Set<NotifyListener> listeners = anyNotifyListeners.get(key);
if (listeners != null) {
listeners.remove(listener);
}
} else if (url.getParameter(Constants.REGISTER_KEY, true)) {
unregister(url, null);
}
}
public List<URL> lookup(URL url) {
if (url == null) {
throw new IllegalArgumentException("lookup url == null");
}
try {
String register = toRegisterPath(url);
List<String> providers = getChildren(register);
if (url.getParameter(Constants.ADMIN_KEY, false)) {
String subscribe = toSubscribePath(url);
List<String> consumers = getChildren(subscribe);
providers.addAll(consumers);
}
return toUrls(url, providers);
} catch (Throwable e) {
throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
private String toRootDir() {
if (root.equals(Constants.PATH_SEPARATOR)) {
return root;
}
return root + Constants.PATH_SEPARATOR;
}
private String toRootPath() {
return root;
}
private String toServicePath(URL url) {
String name = url.getServiceInterface();
if (Constants.ANY_VALUE.equals(name)) {
return toRootPath();
}
return toRootDir() + URL.encode(name);
}
private String toCategoryPath(URL url) {
if (Constants.SUBSCRIBE_PROTOCOL.equals(url.getProtocol())) {
return toSubscribePath(url);
} else {
return toRegisterPath(url);
}
}
private String toRegisterPath(URL url) {
return toServicePath(url) + Constants.PATH_SEPARATOR + Constants.PROVIDERS;
}
private String toSubscribePath(URL url) {
return toServicePath(url) + Constants.PATH_SEPARATOR + Constants.CONSUMERS;
}
private String toProviderPath(URL url) {
return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}
private List<URL> toUrls(URL consumer, List<String> providers) throws KeeperException, InterruptedException {
List<URL> urls = new ArrayList<URL>();
if (providers != null && providers.size() > 0) {
for (String provider : providers) {
provider = URL.decode(provider);
if (provider.contains("://")) {
URL url = URL.valueOf(provider);
if (UrlUtils.isMatch(consumer, url)) {
urls.add(url);
}
}
}
}
if (urls != null && urls.isEmpty() && consumer.getParameter(Constants.ADMIN_KEY, false)) {
urls.add(consumer.setProtocol(Constants.EMPTY_PROTOCOL));
}
return urls;
}
}