/* | |
* Copyright 1999-2011 Alibaba Group. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.alibaba.dubbo.registry.zookeeper; | |
import java.util.ArrayList; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.locks.ReentrantLock; | |
import org.apache.zookeeper.CreateMode; | |
import org.apache.zookeeper.KeeperException; | |
import org.apache.zookeeper.KeeperException.NoNodeException; | |
import org.apache.zookeeper.KeeperException.NodeExistsException; | |
import org.apache.zookeeper.WatchedEvent; | |
import org.apache.zookeeper.Watcher; | |
import org.apache.zookeeper.Watcher.Event.EventType; | |
import org.apache.zookeeper.Watcher.Event.KeeperState; | |
import org.apache.zookeeper.ZooDefs.Ids; | |
import org.apache.zookeeper.ZooKeeper; | |
import org.apache.zookeeper.data.ACL; | |
import com.alibaba.dubbo.common.Constants; | |
import com.alibaba.dubbo.common.URL; | |
import com.alibaba.dubbo.common.logger.Logger; | |
import com.alibaba.dubbo.common.logger.LoggerFactory; | |
import com.alibaba.dubbo.common.utils.ConcurrentHashSet; | |
import com.alibaba.dubbo.common.utils.UrlUtils; | |
import com.alibaba.dubbo.registry.NotifyListener; | |
import com.alibaba.dubbo.registry.support.FailbackRegistry; | |
import com.alibaba.dubbo.rpc.RpcException; | |
/** | |
* ZookeeperRegistry | |
* | |
* @author william.liangf | |
*/ | |
public class ZookeeperRegistry extends FailbackRegistry { | |
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class); | |
private final static int DEFAULT_ZOOKEEPER_PORT = 2181; | |
private final static String DEFAULT_ROOT = "dubbo"; | |
private final String root; | |
private final boolean auth; | |
private final List<ACL> acl; | |
private final ReentrantLock zookeeperLock = new ReentrantLock(); | |
private final Set<String> failedWatched = new ConcurrentHashSet<String>(); | |
private final Set<String> anyServices = new ConcurrentHashSet<String>(); | |
private final ConcurrentMap<String, Set<NotifyListener>> anyNotifyListeners = new ConcurrentHashMap<String, Set<NotifyListener>>(); | |
private volatile ZooKeeper zookeeper; | |
public ZookeeperRegistry(URL url) { | |
super(url); | |
this.auth = url.getUsername() != null && url.getUsername().length() > 0 | |
&& url.getPassword() != null && url.getPassword().length() > 0; | |
this.acl = auth ? Ids.CREATOR_ALL_ACL : Ids.OPEN_ACL_UNSAFE; | |
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); | |
if (! group.startsWith(Constants.PATH_SEPARATOR)) { | |
group = Constants.PATH_SEPARATOR + group; | |
} | |
this.root = group; | |
initZookeeper(); | |
} | |
@Override | |
protected void doRetry() { | |
initZookeeper(); | |
if (failedWatched.size() > 0) { | |
Set<String> failed = new HashSet<String>(failedWatched); | |
if (failed.size() > 0) { | |
if (logger.isInfoEnabled()) { | |
logger.info("Retry watch " + failed + " to zookeeper " + getUrl()); | |
} | |
for (String service : failed) { | |
try { | |
getChildren(service); | |
failedWatched.remove(service); | |
} catch (Throwable t) { | |
logger.warn("Failed to retry register " + failed + " to zookeeper " + getUrl() + ", waiting for again, cause: " + t.getMessage(), t); | |
} | |
} | |
} | |
} | |
} | |
private List<String> watch(String service) { | |
try { | |
ZooKeeper zk = ZookeeperRegistry.this.zookeeper; | |
if (zk != null) { | |
List<String> result = getChildren(service); | |
failedWatched.remove(service); | |
return result; | |
} | |
} catch (Throwable e) { | |
logger.warn("Failed to watch path " + service + " to zookeeper" + getUrl() + ", cause: " + e.getMessage(), e); | |
} | |
failedWatched.add(service); | |
return new ArrayList<String>(0); | |
} | |
private void initZookeeper() { | |
ZooKeeper zk = this.zookeeper; | |
if (zk == null || zk.getState() == null || ! zk.getState().isAlive()) { | |
zookeeperLock.lock(); | |
try { | |
zk = this.zookeeper; | |
if (zk == null || zk.getState() == null || ! zk.getState().isAlive()) { | |
this.zookeeper = createZookeeper(); | |
recover(); | |
} | |
if (zk != null) { | |
zk.close(); | |
} | |
} catch (Exception e) { | |
throw new IllegalStateException("Can not connect to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); | |
} finally { | |
zookeeperLock.unlock(); | |
} | |
} | |
} | |
static String appendDefaultPort(String address) { | |
if (address != null && address.length() > 0) { | |
int i = address.indexOf(':'); | |
if (i < 0) { | |
return address + ":" + DEFAULT_ZOOKEEPER_PORT; | |
} else if (Integer.parseInt(address.substring(i + 1)) == 0) { | |
return address.substring(0, i + 1) + DEFAULT_ZOOKEEPER_PORT; | |
} | |
} | |
return address; | |
} | |
private ZooKeeper createZookeeper() throws Exception { | |
URL url = getUrl(); | |
StringBuilder address = new StringBuilder(appendDefaultPort(url.getAddress())); | |
String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]); | |
if (backups != null && backups.length > 0) { | |
for (String backup : backups) { | |
address.append(","); | |
address.append(appendDefaultPort(backup)); | |
} | |
} | |
ZooKeeper zk = new ZooKeeper(address.toString(), url.getPositiveParameter( | |
Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT), new Watcher() { | |
public void process(WatchedEvent event) { | |
try { | |
if (event.getState() == KeeperState.Expired) { | |
initZookeeper(); | |
} else if (event.getState() == KeeperState.SyncConnected | |
&& event.getType() == EventType.None) { | |
recover(); | |
} | |
if (event.getType() != EventType.NodeChildrenChanged) { | |
return; | |
} | |
String path = event.getPath(); | |
if (path == null || path.length() == 0) { | |
return; | |
} | |
List<String> children = watch(path); | |
if (path.equals(toRootPath())) { | |
List<String> services = children; | |
if (services != null && services.size() > 0) { | |
for (String service : services) { | |
if (anyServices.contains(service)) { | |
continue; | |
} | |
anyServices.add(service); | |
for (Map.Entry<String, Set<NotifyListener>> entry : anyNotifyListeners.entrySet()) { | |
URL subscribeUrl = URL.valueOf(entry.getKey()).setPath(service).addParameters( | |
Constants.INTERFACE_KEY, service, | |
Constants.CHECK_KEY, String.valueOf(false), | |
Constants.REGISTER_KEY, String.valueOf(false)); | |
for (NotifyListener listener : entry.getValue()) { | |
subscribe(subscribeUrl, listener); | |
} | |
} | |
} | |
} | |
} else { | |
String dir = toRootDir(); | |
String action = Constants.PROVIDERS; | |
String service = path; | |
if (service.startsWith(dir)) { | |
service = service.substring(dir.length()); | |
} | |
int i = service.indexOf(Constants.PATH_SEPARATOR); | |
if (i >= 0) { | |
action = service.substring(i + 1); | |
service = service.substring(0, i); | |
} | |
service = URL.decode(service); | |
List<String> adminChildren = null; | |
for (Map.Entry<String, Set<NotifyListener>> entry : getSubscribed().entrySet()) { | |
String key = entry.getKey(); | |
URL subscribe = URL.valueOf(key); | |
List<String> notifies = children; | |
if (subscribe.getParameter(Constants.ADMIN_KEY, false)) { | |
if (adminChildren == null) { | |
adminChildren = getChildren(path.substring(0, path.lastIndexOf(Constants.PATH_SEPARATOR) + 1) + (Constants.CONSUMERS.equals(action) ? Constants.PROVIDERS : Constants.CONSUMERS)); | |
adminChildren.addAll(children); | |
} | |
notifies = adminChildren; | |
} else if (Constants.CONSUMERS.equals(action)) { | |
continue; | |
} | |
String subscribeService = subscribe.getServiceInterface(); | |
if (service.equals(subscribeService)) { | |
List<URL> list = toUrls(subscribe, notifies); | |
if (logger.isInfoEnabled()) { | |
logger.info("Zookeeper service changed, service: " + service + ", urls: " + list + ", zookeeper: " + getUrl()); | |
} | |
for (NotifyListener listener : entry.getValue()) { | |
ZookeeperRegistry.this.notify(subscribe, listener, list); | |
} | |
} | |
} | |
} | |
} catch (Throwable e) { | |
logger.error("Failed to received event path " + event.getPath() + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); | |
} | |
} | |
}); | |
if (auth) { | |
zk.addAuthInfo(url.getUsername(), url.getPassword().getBytes()); | |
} | |
return zk; | |
} | |
public boolean isAvailable() { | |
return zookeeper.getState().isAlive(); | |
} | |
public void destroy() { | |
super.destroy(); | |
try { | |
zookeeper.close(); | |
} catch (Exception e) { | |
logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e); | |
} | |
} | |
private boolean exists(String node) { | |
try { | |
return zookeeper.exists(node, false) != null; | |
} catch (Throwable e) { | |
return false; | |
} | |
} | |
protected void doRegister(URL url) { | |
try { | |
String root = toRootPath(); | |
if (root != null && root.length() > 0 && ! Constants.PATH_SEPARATOR.equals(root) | |
&& ! exists(root)) { | |
try { | |
zookeeper.create(root, new byte[0], acl, CreateMode.PERSISTENT); | |
} catch (NodeExistsException e) { | |
} | |
} | |
String service = toServicePath(url); | |
if (! exists(service)) { | |
try { | |
zookeeper.create(service, new byte[0], acl, CreateMode.PERSISTENT); | |
} catch (NodeExistsException e) { | |
} | |
} | |
String category = toCategoryPath(url); | |
if (! exists(category)) { | |
try { | |
zookeeper.create(category, new byte[0], acl, CreateMode.PERSISTENT); | |
} catch (NodeExistsException e) { | |
} | |
} | |
String provider = toProviderPath(url); | |
if (exists(provider)) { | |
try { | |
zookeeper.delete(provider, -1); | |
} catch (NoNodeException e) { | |
} | |
} | |
CreateMode createMode = Constants.ROUTE_PROTOCOL.equals(url.getProtocol()) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL; | |
try { | |
zookeeper.create(provider, new byte[0], acl, createMode); | |
} catch (NodeExistsException e) { | |
try { | |
zookeeper.delete(provider, -1); | |
} catch (NoNodeException e2) { | |
} | |
zookeeper.create(provider, new byte[0], acl, createMode); | |
} | |
} catch (Throwable e) { | |
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); | |
} | |
} | |
protected void doUnregister(URL url) { | |
try { | |
String provider = toProviderPath(url); | |
zookeeper.delete(provider, -1); | |
} catch (Throwable e) { | |
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); | |
} | |
} | |
protected void doSubscribe(URL url, NotifyListener listener) { | |
try { | |
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { | |
String key = url.toFullString(); | |
Set<NotifyListener> listeners = anyNotifyListeners.get(key); | |
if (listeners == null) { | |
anyNotifyListeners.putIfAbsent(key, new ConcurrentHashSet<NotifyListener>()); | |
listeners = anyNotifyListeners.get(key); | |
} | |
listeners.add(listener); | |
String root = toRootPath(); | |
List<String> services = getChildren(root); | |
if (services != null && services.size() > 0) { | |
anyServices.addAll(services); | |
for (String service : services) { | |
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, | |
Constants.CHECK_KEY, String.valueOf(false), Constants.REGISTER_KEY, String.valueOf(false)), listener); | |
} | |
} | |
} else { | |
if (url.getParameter(Constants.REGISTER_KEY, true)) { | |
register(url, null); | |
} | |
String register = toRegisterPath(url); | |
List<String> providers = getChildren(register); | |
if (url.getParameter(Constants.ADMIN_KEY, false)) { | |
String subscribe = toSubscribePath(url); | |
List<String> consumers = getChildren(subscribe); | |
providers.addAll(consumers); | |
} | |
List<URL> urls = toUrls(url, providers); | |
notify(url, listener, urls); | |
} | |
} catch (Throwable e) { | |
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); | |
} | |
} | |
private List<String> getChildren(String service) throws KeeperException, InterruptedException { | |
try { | |
List<String> list = zookeeper.getChildren(service, true); | |
if (list == null || list.size() == 0) { | |
return new ArrayList<String>(0); | |
} | |
List<String> result = new ArrayList<String>(); | |
for (String value : list) { | |
result.add(URL.decode(value)); | |
} | |
return result; | |
} catch (KeeperException e) { | |
if (e instanceof KeeperException.NoNodeException) { | |
return new ArrayList<String>(0); | |
} | |
throw e; | |
} | |
} | |
protected void doUnsubscribe(URL url, NotifyListener listener) { | |
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { | |
String key = url.toFullString(); | |
Set<NotifyListener> listeners = anyNotifyListeners.get(key); | |
if (listeners != null) { | |
listeners.remove(listener); | |
} | |
} else if (url.getParameter(Constants.REGISTER_KEY, true)) { | |
unregister(url, null); | |
} | |
} | |
public List<URL> lookup(URL url) { | |
if (url == null) { | |
throw new IllegalArgumentException("lookup url == null"); | |
} | |
try { | |
String register = toRegisterPath(url); | |
List<String> providers = getChildren(register); | |
if (url.getParameter(Constants.ADMIN_KEY, false)) { | |
String subscribe = toSubscribePath(url); | |
List<String> consumers = getChildren(subscribe); | |
providers.addAll(consumers); | |
} | |
return toUrls(url, providers); | |
} catch (Throwable e) { | |
throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); | |
} | |
} | |
private String toRootDir() { | |
if (root.equals(Constants.PATH_SEPARATOR)) { | |
return root; | |
} | |
return root + Constants.PATH_SEPARATOR; | |
} | |
private String toRootPath() { | |
return root; | |
} | |
private String toServicePath(URL url) { | |
String name = url.getServiceInterface(); | |
if (Constants.ANY_VALUE.equals(name)) { | |
return toRootPath(); | |
} | |
return toRootDir() + URL.encode(name); | |
} | |
private String toCategoryPath(URL url) { | |
if (Constants.SUBSCRIBE_PROTOCOL.equals(url.getProtocol())) { | |
return toSubscribePath(url); | |
} else { | |
return toRegisterPath(url); | |
} | |
} | |
private String toRegisterPath(URL url) { | |
return toServicePath(url) + Constants.PATH_SEPARATOR + Constants.PROVIDERS; | |
} | |
private String toSubscribePath(URL url) { | |
return toServicePath(url) + Constants.PATH_SEPARATOR + Constants.CONSUMERS; | |
} | |
private String toProviderPath(URL url) { | |
return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString()); | |
} | |
private List<URL> toUrls(URL consumer, List<String> providers) throws KeeperException, InterruptedException { | |
List<URL> urls = new ArrayList<URL>(); | |
if (providers != null && providers.size() > 0) { | |
for (String provider : providers) { | |
provider = URL.decode(provider); | |
if (provider.contains("://")) { | |
URL url = URL.valueOf(provider); | |
if (UrlUtils.isMatch(consumer, url)) { | |
urls.add(url); | |
} | |
} | |
} | |
} | |
if (urls != null && urls.isEmpty() && consumer.getParameter(Constants.ADMIN_KEY, false)) { | |
urls.add(consumer.setProtocol(Constants.EMPTY_PROTOCOL)); | |
} | |
return urls; | |
} | |
} |