/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.alibaba.dubbo.remoting.zookeeper.curator; | |
import com.alibaba.dubbo.common.URL; | |
import com.alibaba.dubbo.common.utils.StringUtils; | |
import com.alibaba.dubbo.remoting.zookeeper.ChildListener; | |
import com.alibaba.dubbo.remoting.zookeeper.StateListener; | |
import com.alibaba.dubbo.remoting.zookeeper.support.AbstractZookeeperClient; | |
import org.apache.curator.framework.CuratorFramework; | |
import org.apache.curator.framework.CuratorFrameworkFactory; | |
import org.apache.curator.framework.api.CuratorWatcher; | |
import org.apache.curator.framework.state.ConnectionState; | |
import org.apache.curator.framework.state.ConnectionStateListener; | |
import org.apache.curator.retry.RetryNTimes; | |
import org.apache.zookeeper.CreateMode; | |
import org.apache.zookeeper.KeeperException.NoNodeException; | |
import org.apache.zookeeper.KeeperException.NodeExistsException; | |
import org.apache.zookeeper.WatchedEvent; | |
import java.util.Collections; | |
import java.util.List; | |
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> { | |
private final CuratorFramework client; | |
public CuratorZookeeperClient(URL url) { | |
super(url); | |
try { | |
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() | |
.connectString(url.getBackupAddress()) | |
.retryPolicy(new RetryNTimes(1, 1000)) | |
.connectionTimeoutMs(5000); | |
String authority = url.getAuthority(); | |
if (authority != null && authority.length() > 0) { | |
builder = builder.authorization("digest", authority.getBytes()); | |
} | |
client = builder.build(); | |
client.getConnectionStateListenable().addListener(new ConnectionStateListener() { | |
@Override | |
public void stateChanged(CuratorFramework client, ConnectionState state) { | |
if (state == ConnectionState.LOST) { | |
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); | |
} else if (state == ConnectionState.CONNECTED) { | |
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); | |
} else if (state == ConnectionState.RECONNECTED) { | |
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); | |
} | |
} | |
}); | |
client.start(); | |
} catch (Exception e) { | |
throw new IllegalStateException(e.getMessage(), e); | |
} | |
} | |
@Override | |
public void createPersistent(String path) { | |
try { | |
client.create().forPath(path); | |
} catch (NodeExistsException e) { | |
} catch (Exception e) { | |
throw new IllegalStateException(e.getMessage(), e); | |
} | |
} | |
@Override | |
public void createEphemeral(String path) { | |
try { | |
client.create().withMode(CreateMode.EPHEMERAL).forPath(path); | |
} catch (NodeExistsException e) { | |
} catch (Exception e) { | |
throw new IllegalStateException(e.getMessage(), e); | |
} | |
} | |
@Override | |
public void delete(String path) { | |
try { | |
client.delete().forPath(path); | |
} catch (NoNodeException e) { | |
} catch (Exception e) { | |
throw new IllegalStateException(e.getMessage(), e); | |
} | |
} | |
@Override | |
public List<String> getChildren(String path) { | |
try { | |
return client.getChildren().forPath(path); | |
} catch (NoNodeException e) { | |
return null; | |
} catch (Exception e) { | |
throw new IllegalStateException(e.getMessage(), e); | |
} | |
} | |
@Override | |
public boolean checkExists(String path) { | |
try { | |
if (client.checkExists().forPath(path) != null) { | |
return true; | |
} | |
} catch (Exception e) { | |
} | |
return false; | |
} | |
@Override | |
public boolean isConnected() { | |
return client.getZookeeperClient().isConnected(); | |
} | |
@Override | |
public void doClose() { | |
client.close(); | |
} | |
@Override | |
public CuratorWatcher createTargetChildListener(String path, ChildListener listener) { | |
return new CuratorWatcherImpl(listener); | |
} | |
@Override | |
public List<String> addTargetChildListener(String path, CuratorWatcher listener) { | |
try { | |
return client.getChildren().usingWatcher(listener).forPath(path); | |
} catch (NoNodeException e) { | |
return null; | |
} catch (Exception e) { | |
throw new IllegalStateException(e.getMessage(), e); | |
} | |
} | |
@Override | |
public void removeTargetChildListener(String path, CuratorWatcher listener) { | |
((CuratorWatcherImpl) listener).unwatch(); | |
} | |
private class CuratorWatcherImpl implements CuratorWatcher { | |
private volatile ChildListener listener; | |
public CuratorWatcherImpl(ChildListener listener) { | |
this.listener = listener; | |
} | |
public void unwatch() { | |
this.listener = null; | |
} | |
@Override | |
public void process(WatchedEvent event) throws Exception { | |
if (listener != null) { | |
String path = event.getPath() == null ? "" : event.getPath(); | |
listener.childChanged(path, | |
// if path is null, curator using watcher will throw NullPointerException. | |
// if client connect or disconnect to server, zookeeper will queue | |
// watched event(Watcher.Event.EventType.None, .., path = null). | |
StringUtils.isNotEmpty(path) | |
? client.getChildren().usingWatcher(this).forPath(path) | |
: Collections.<String>emptyList()); | |
} | |
} | |
} | |
} |