blob: 34679c31d9ba943930dcf7ede506e0b56c81f5c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.zookeeper.curator;
import java.nio.charset.StandardCharsets;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.zookeeper.ChildListener;
import org.apache.dubbo.remoting.zookeeper.DataListener;
import org.apache.dubbo.remoting.zookeeper.EventType;
import org.apache.dubbo.remoting.zookeeper.StateListener;
import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZookeeperClient.CuratorWatcherImpl, CuratorZookeeperClient.CuratorWatcherImpl> {
protected static final Logger logger = LoggerFactory.getLogger(CuratorZookeeperClient.class);
private static final String ZK_SESSION_EXPIRE_KEY = "zk.session.expire";
static final Charset CHARSET = StandardCharsets.UTF_8;
private final CuratorFramework client;
private Map<String, TreeCache> treeCacheMap = new ConcurrentHashMap<>();
public CuratorZookeeperClient(URL url) {
super(url);
try {
int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS);
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(timeout)
.sessionTimeoutMs(sessionExpireMs);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
client = builder.build();
client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
client.start();
boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
if (!connected) {
throw new IllegalStateException("zookeeper not connected");
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public void createPersistent(String path) {
try {
client.create().forPath(path);
} catch (NodeExistsException e) {
logger.warn("ZNode " + path + " already exists.", e);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public void createEphemeral(String path) {
try {
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (NodeExistsException e) {
logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
" may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
"we can just try to delete and create again.", e);
deletePath(path);
createEphemeral(path);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
protected void createPersistent(String path, String data) {
byte[] dataBytes = data.getBytes(CHARSET);
try {
client.create().forPath(path, dataBytes);
} catch (NodeExistsException e) {
try {
client.setData().forPath(path, dataBytes);
} catch (Exception e1) {
throw new IllegalStateException(e.getMessage(), e1);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
protected void createEphemeral(String path, String data) {
byte[] dataBytes = data.getBytes(CHARSET);
try {
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, dataBytes);
} catch (NodeExistsException e) {
logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
" may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
"we can just try to delete and create again.", e);
deletePath(path);
createEphemeral(path, data);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
protected void deletePath(String path) {
try {
client.delete().deletingChildrenIfNeeded().forPath(path);
} catch (NoNodeException e) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public List<String> getChildren(String path) {
try {
return client.getChildren().forPath(path);
} catch (NoNodeException e) {
return null;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public boolean checkExists(String path) {
try {
if (client.checkExists().forPath(path) != null) {
return true;
}
} catch (Exception e) {
}
return false;
}
@Override
public boolean isConnected() {
return client.getZookeeperClient().isConnected();
}
@Override
public String doGetContent(String path) {
try {
byte[] dataBytes = client.getData().forPath(path);
return (dataBytes == null || dataBytes.length == 0) ? null : new String(dataBytes, CHARSET);
} catch (NoNodeException e) {
// ignore NoNode Exception.
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
return null;
}
@Override
public void doClose() {
client.close();
}
@Override
public CuratorZookeeperClient.CuratorWatcherImpl createTargetChildListener(String path, ChildListener listener) {
return new CuratorZookeeperClient.CuratorWatcherImpl(client, listener, path);
}
@Override
public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
try {
return client.getChildren().usingWatcher(listener).forPath(path);
} catch (NoNodeException e) {
return null;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
protected CuratorZookeeperClient.CuratorWatcherImpl createTargetDataListener(String path, DataListener listener) {
return new CuratorWatcherImpl(client, listener);
}
@Override
protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) {
this.addTargetDataListener(path, treeCacheListener, null);
}
@Override
protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener, Executor executor) {
try {
TreeCache treeCache = TreeCache.newBuilder(client, path).setCacheData(false).build();
treeCacheMap.putIfAbsent(path, treeCache);
if (executor == null) {
treeCache.getListenable().addListener(treeCacheListener);
} else {
treeCache.getListenable().addListener(treeCacheListener, executor);
}
treeCache.start();
} catch (Exception e) {
throw new IllegalStateException("Add treeCache listener for path:" + path, e);
}
}
@Override
protected void removeTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) {
TreeCache treeCache = treeCacheMap.get(path);
if (treeCache != null) {
treeCache.getListenable().removeListener(treeCacheListener);
}
treeCacheListener.dataListener = null;
}
@Override
public void removeTargetChildListener(String path, CuratorWatcherImpl listener) {
listener.unwatch();
}
static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener {
private CuratorFramework client;
private volatile ChildListener childListener;
private volatile DataListener dataListener;
private String path;
public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path) {
this.client = client;
this.childListener = listener;
this.path = path;
}
public CuratorWatcherImpl(CuratorFramework client, DataListener dataListener) {
this.dataListener = dataListener;
}
protected CuratorWatcherImpl() {
}
public void unwatch() {
this.childListener = null;
}
@Override
public void process(WatchedEvent event) throws Exception {
// if client connect or disconnect to server, zookeeper will queue
// watched event(Watcher.Event.EventType.None, .., path = null).
if (event.getType() == Watcher.Event.EventType.None) {
return;
}
if (childListener != null) {
childListener.childChanged(path, client.getChildren().usingWatcher(this).forPath(path));
}
}
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
if (dataListener != null) {
if (logger.isDebugEnabled()) {
logger.debug("listen the zookeeper changed. The changed data:" + event.getData());
}
TreeCacheEvent.Type type = event.getType();
EventType eventType = null;
String content = null;
String path = null;
switch (type) {
case NODE_ADDED:
eventType = EventType.NodeCreated;
path = event.getData().getPath();
content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
break;
case NODE_UPDATED:
eventType = EventType.NodeDataChanged;
path = event.getData().getPath();
content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
break;
case NODE_REMOVED:
path = event.getData().getPath();
eventType = EventType.NodeDeleted;
break;
case INITIALIZED:
eventType = EventType.INITIALIZED;
break;
case CONNECTION_LOST:
eventType = EventType.CONNECTION_LOST;
break;
case CONNECTION_RECONNECTED:
eventType = EventType.CONNECTION_RECONNECTED;
break;
case CONNECTION_SUSPENDED:
eventType = EventType.CONNECTION_SUSPENDED;
break;
}
dataListener.dataChanged(path, content, eventType);
}
}
}
private class CuratorConnectionStateListener implements ConnectionStateListener {
private final long UNKNOWN_SESSION_ID = -1L;
private long lastSessionId;
private URL url;
public CuratorConnectionStateListener(URL url) {
this.url = url;
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS);
long sessionId = UNKNOWN_SESSION_ID;
try {
sessionId = client.getZookeeperClient().getZooKeeper().getSessionId();
} catch (Exception e) {
logger.warn("Curator client state changed, but failed to get the related zk session instance.");
}
if (state == ConnectionState.LOST) {
logger.warn("Curator zookeeper session " + Long.toHexString(lastSessionId) + " expired.");
CuratorZookeeperClient.this.stateChanged(StateListener.SESSION_LOST);
} else if (state == ConnectionState.SUSPENDED) {
logger.warn("Curator zookeeper connection of session " + Long.toHexString(sessionId) + " timed out. " +
"connection timeout value is " + timeout + ", session expire timeout value is " + sessionExpireMs);
CuratorZookeeperClient.this.stateChanged(StateListener.SUSPENDED);
} else if (state == ConnectionState.CONNECTED) {
lastSessionId = sessionId;
logger.info("Curator zookeeper client instance initiated successfully, session id is " + Long.toHexString(sessionId));
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
if (lastSessionId == sessionId && sessionId != UNKNOWN_SESSION_ID) {
logger.warn("Curator zookeeper connection recovered from connection lose, " +
"reuse the old session " + Long.toHexString(sessionId));
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
} else {
logger.warn("New session created after old session lost, " +
"old session " + Long.toHexString(lastSessionId) + ", new session " + Long.toHexString(sessionId));
lastSessionId = sessionId;
CuratorZookeeperClient.this.stateChanged(StateListener.NEW_SESSION_CREATED);
}
}
}
}
/**
* just for unit test
*
* @return
*/
CuratorFramework getClient() {
return client;
}
}