blob: 7b2407249da86887cb935ce24dd2ceb9bc830ec8 [file] [log] [blame]
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.registry.zookeeper;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.ConcurrentHashSet;
import com.alibaba.dubbo.common.utils.UrlUtils;
import com.alibaba.dubbo.registry.NotifyListener;
import com.alibaba.dubbo.registry.support.FailbackRegistry;
import com.alibaba.dubbo.rpc.RpcException;
/**
* ZookeeperRegistry
*
* @author william.liangf
*/
public class ZookeeperRegistry extends FailbackRegistry {
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
private final static String DEFAULT_ROOT = "dubbo";
private final String root;
// private final boolean auth;
// private final List<ACL> acl; // zkclient 0.1.0 unsupported
private final Set<String> anyServices = new ConcurrentHashSet<String>();
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, IZkChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, IZkChildListener>>();
private final ZkClient zkClient;
private volatile KeeperState zkState = KeeperState.SyncConnected;
public ZookeeperRegistry(URL url) {
super(url);
// this.auth = url.getUsername() != null && url.getUsername().length() > 0
// && url.getPassword() != null && url.getPassword().length() > 0;
// this.acl = auth ? Ids.CREATOR_ALL_ACL : Ids.OPEN_ACL_UNSAFE;
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (! group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
StringBuilder address = new StringBuilder(appendDefaultPort(url.getAddress()));
String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
if (backups != null && backups.length > 0) {
for (String backup : backups) {
address.append(",");
address.append(appendDefaultPort(backup));
}
}
zkClient = new ZkClient(address.toString());
zkClient.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
ZookeeperRegistry.this.zkState = state;
}
public void handleNewSession() throws Exception {
recover();
}
});
}
public boolean isAvailable() {
return zkState == KeeperState.SyncConnected;
}
public void destroy() {
super.destroy();
try {
zkClient.close();
} catch (Exception e) {
logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
protected void doRegister(URL url) {
try {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
zkClient.createPersistent(toCategoryPath(url), true);
zkClient.createEphemeral(toUrlPath(url));
} else {
zkClient.createPersistent(toUrlPath(url), true);
}
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
protected void doUnregister(URL url) {
try {
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, IZkChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, IZkChildListener>());
listeners = zkListeners.get(url);
}
IZkChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
for (String child : currentChilds) {
if (! anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
List<String> services = zkClient.subscribeChildChanges(root, zkListener);
if (services != null && services.size() > 0) {
anyServices.addAll(services);
for (String service : services) {
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<String> providers = new ArrayList<String>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, IZkChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, IZkChildListener>());
listeners = zkListeners.get(url);
}
IZkChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
ZookeeperRegistry.this.notify(url, listener, toUrls(url, currentChilds));
}
});
zkListener = listeners.get(listener);
}
List<String> children = zkClient.subscribeChildChanges(path, zkListener);
if (children != null) {
providers.addAll(children);
}
}
List<URL> urls = toUrls(url, providers);
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
protected void doUnsubscribe(URL url, NotifyListener listener) {
ConcurrentMap<NotifyListener, IZkChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
IZkChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
zkClient.unsubscribeChildChanges(toUrlPath(url), zkListener);
}
}
}
public List<URL> lookup(URL url) {
if (url == null) {
throw new IllegalArgumentException("lookup url == null");
}
try {
List<String> providers = new ArrayList<String>();
for (String path : toCategoriesPath(url)) {
try {
List<String> children = zkClient.getChildren(path);
if (children != null) {
providers.addAll(children);
}
} catch (ZkNoNodeException e) {
// ignore
}
}
return toUrls(url, providers);
} catch (Throwable e) {
throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
private String toRootDir() {
if (root.equals(Constants.PATH_SEPARATOR)) {
return root;
}
return root + Constants.PATH_SEPARATOR;
}
private String toRootPath() {
return root;
}
private String toServicePath(URL url) {
String name = url.getServiceInterface();
if (Constants.ANY_VALUE.equals(name)) {
return toRootPath();
}
return toRootDir() + URL.encode(name);
}
private String[] toCategoriesPath(URL url) {
String[] categroies;
if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
categroies = new String[] {Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
} else {
categroies = url.getParameter(Constants.CATEGORY_KEY, new String[] {Constants.DEFAULT_CATEGORY});
}
String[] paths = new String[categroies.length];
for (int i = 0; i < categroies.length; i ++) {
paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categroies[i];
}
return paths;
}
private String toCategoryPath(URL url) {
return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}
private String toUrlPath(URL url) {
return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}
private List<URL> toUrls(URL consumer, List<String> providers) throws KeeperException, InterruptedException {
List<URL> urls = new ArrayList<URL>();
if (providers != null && providers.size() > 0) {
for (String provider : providers) {
provider = URL.decode(provider);
if (provider.contains("://")) {
URL url = URL.valueOf(provider);
if (UrlUtils.isMatch(consumer, url)) {
urls.add(url);
}
}
}
}
if (urls != null && urls.isEmpty() && Constants.ANY_VALUE.equals(consumer.getServiceInterface())) {
urls.add(consumer.setProtocol(Constants.EMPTY_PROTOCOL));
}
return urls;
}
static String appendDefaultPort(String address) {
if (address != null && address.length() > 0) {
int i = address.indexOf(':');
if (i < 0) {
return address + ":" + DEFAULT_ZOOKEEPER_PORT;
} else if (Integer.parseInt(address.substring(i + 1)) == 0) {
return address.substring(0, i + 1) + DEFAULT_ZOOKEEPER_PORT;
}
}
return address;
}
}