blob: 2baec6276220f3d7c85a54935c4dc5cf90a25aa0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.linkedin.util.clock.Clock;
import org.linkedin.util.clock.SystemClock;
import org.linkedin.util.clock.Timespan;
import org.linkedin.util.concurrent.ConcurrentUtils;
import org.linkedin.util.io.PathUtils;
import org.linkedin.zookeeper.client.*;
import org.slf4j.Logger;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
public class ZKClient extends org.linkedin.zookeeper.client.AbstractZKClient implements Watcher {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ZKClient.class);
private Map<String, String> acls;
private String password;
public void start() throws Exception {
// Grab the lock to make sure that the registration of the ManagedService
// won't be updated immediately but that the initial update will happen first
synchronized (_lock) {
_stateChangeDispatcher.setDaemon(true);
_stateChangeDispatcher.start();
doStart();
}
}
public void setACLs(Map<String, String> acls) {
this.acls = acls;
}
public void setPassword(String password) {
this.password = password;
}
protected void doStart() throws UnsupportedEncodingException {
connect();
}
@Override
public void close() {
if (_stateChangeDispatcher != null) {
_stateChangeDispatcher.end();
try {
_stateChangeDispatcher.join(1000);
} catch (Exception e) {
LOG.debug("ignored exception", e);
}
}
synchronized(_lock) {
if (_zk != null) {
try {
changeState(State.NONE);
_zk.close();
Thread th = getSendThread();
if (th != null) {
th.join(1000);
}
_zk = null;
} catch (Exception e) {
LOG.debug("ignored exception", e);
}
}
}
}
protected Thread getSendThread() {
try {
return (Thread) getField(_zk, "_zk", "cnxn", "sendThread");
} catch (Throwable e) {
return null;
}
}
protected Object getField(Object obj, String... names) throws Exception {
for (String name : names) {
obj = getField(obj, name);
}
return obj;
}
protected Object getField(Object obj, String name) throws Exception {
Class clazz = obj.getClass();
while (clazz != null) {
for (Field f : clazz.getDeclaredFields()) {
if (f.getName().equals(name)) {
f.setAccessible(true);
return f.get(obj);
}
}
}
throw new NoSuchFieldError(name);
}
protected void changeState(State newState) {
synchronized (_lock) {
State oldState = _state;
if (oldState != newState) {
_stateChangeDispatcher.addEvent(oldState, newState);
_state = newState;
_lock.notifyAll();
}
}
}
public void testGenerateConnectionLoss() throws Exception {
waitForConnected();
Object clientCnxnSocket = getField(_zk, "_zk", "cnxn", "sendThread", "clientCnxnSocket");
callMethod(clientCnxnSocket, "testableCloseSocket");
}
protected Object callMethod(Object obj, String name, Object... args) throws Exception {
Class clazz = obj.getClass();
while (clazz != null) {
for (Method m : clazz.getDeclaredMethods()) {
if (m.getName().equals(name)) {
m.setAccessible(true);
return m.invoke(obj, args);
}
}
}
throw new NoSuchMethodError(name);
}
protected void tryConnect() {
synchronized (_lock) {
try {
connect();
} catch (Throwable e) {
LOG.warn("Error while restarting:", e);
if (_expiredSessionRecovery == null) {
_expiredSessionRecovery = new ExpiredSessionRecovery();
_expiredSessionRecovery.setDaemon(true);
_expiredSessionRecovery.start();
}
}
}
}
public void connect() throws UnsupportedEncodingException {
synchronized (_lock) {
changeState(State.CONNECTING);
_zk = _factory.createZooKeeper(this);
if (password != null) {
_zk.addAuthInfo("digest", ("fabric:" + password).getBytes("UTF-8"));
}
}
}
public void process(WatchedEvent event) {
if (event.getState() != null) {
LOG.debug("event: {}", event.getState());
synchronized (_lock) {
switch(event.getState()) {
case SyncConnected:
changeState(State.CONNECTED);
break;
case Disconnected:
if (_state != State.NONE) {
changeState(State.RECONNECTING);
}
break;
case Expired:
// when expired, the zookeeper object is invalid and we need to recreate a new one
_zk = null;
LOG.warn("Expiration detected: trying to restart...");
tryConnect();
break;
default:
LOG.warn("Unsupported event state: {}", event.getState());
}
}
}
}
@Override
protected IZooKeeper getZk() {
State state = _state;
if (state == State.NONE) {
throw new IllegalStateException("ZooKeeper client has not been configured yet. You need to either create an ensemble or join one.");
} else if (state != State.CONNECTING) {
try {
waitForConnected();
} catch (Exception e) {
throw new IllegalStateException("Error waiting for ZooKeeper connection", e);
}
}
IZooKeeper zk = _zk;
if (zk == null) {
throw new IllegalStateException("No ZooKeeper connection available");
}
return zk;
}
public void waitForConnected(Timespan timeout) throws InterruptedException, TimeoutException {
waitForState(State.CONNECTED, timeout);
}
public void waitForConnected() throws InterruptedException, TimeoutException {
waitForConnected(null);
}
public void waitForState(State state, Timespan timeout) throws TimeoutException, InterruptedException {
long endTime = (timeout == null ? sessionTimeout : timeout).futureTimeMillis(_clock);
if (_state != state) {
synchronized (_lock) {
while (_state != state) {
ConcurrentUtils.awaitUntil(_clock, _lock, endTime);
}
}
}
}
@Override
public void registerListener(LifecycleListener listener) {
if (listener == null) {
throw new IllegalStateException("listener is null");
}
if (!_listeners.contains(listener)) {
_listeners.add(listener);
}
if (_state == State.CONNECTED) {
listener.onConnected();
//_stateChangeDispatcher.addEvent(null, State.CONNECTED);
}
}
@Override
public void removeListener(LifecycleListener listener) {
if (listener == null) {
throw new IllegalStateException("listener is null");
}
_listeners.remove(listener);
}
@Override
public org.linkedin.zookeeper.client.IZKClient chroot(String path) {
return new ChrootedZKClient(this, adjustPath(path));
}
@Override
public boolean isConnected() {
return _state == State.CONNECTED;
}
public boolean isConfigured() {
return _state != State.NONE;
}
@Override
public String getConnectString() {
return _factory.getConnectString();
}
public static enum State {
NONE,
CONNECTING,
CONNECTED,
RECONNECTING
}
private final static String CHARSET = "UTF-8";
private final Clock _clock = SystemClock.instance();
private final List<LifecycleListener> _listeners = new CopyOnWriteArrayList<>();
protected final Object _lock = new Object();
protected volatile State _state = State.NONE;
private final StateChangeDispatcher _stateChangeDispatcher = new StateChangeDispatcher();
protected IZooKeeperFactory _factory;
protected IZooKeeper _zk;
protected Timespan _reconnectTimeout = Timespan.parse("20s");
protected Timespan sessionTimeout = new Timespan(30, Timespan.TimeUnit.SECOND);
private ExpiredSessionRecovery _expiredSessionRecovery = null;
private class StateChangeDispatcher extends Thread {
private final AtomicBoolean _running = new AtomicBoolean(true);
private final BlockingQueue<Boolean> _events = new LinkedBlockingQueue<>();
private StateChangeDispatcher() {
super("ZooKeeper state change dispatcher thread");
}
@Override
public void run() {
Map<Object, Boolean> history = new IdentityHashMap<>();
LOG.info("Starting StateChangeDispatcher");
while (_running.get()) {
Boolean isConnectedEvent;
try {
isConnectedEvent = _events.take();
} catch (InterruptedException e) {
continue;
}
if (!_running.get() || isConnectedEvent == null) {
continue;
}
Map<Object, Boolean> newHistory = callListeners(history, isConnectedEvent);
// we save which event each listener has seen last
// we don't update the map in place because we need to get rid of unregistered listeners
history = newHistory;
}
LOG.info("StateChangeDispatcher terminated.");
}
public void end() {
_running.set(false);
_events.add(false);
}
public void addEvent(ZKClient.State oldState, ZKClient.State newState) {
LOG.debug("addEvent: {} => {}", oldState, newState);
if (newState == ZKClient.State.CONNECTED) {
_events.add(true);
} else if (oldState == ZKClient.State.CONNECTED) {
_events.add(false);
}
}
}
protected Map<Object, Boolean> callListeners(Map<Object, Boolean> history, Boolean connectedEvent) {
Map<Object, Boolean> newHistory = new IdentityHashMap<>();
for (LifecycleListener listener : _listeners) {
Boolean previousEvent = history.get(listener);
// we propagate the event only if it was not already sent
if (previousEvent == null || previousEvent != connectedEvent) {
try {
if (connectedEvent) {
listener.onConnected();
} else {
listener.onDisconnected();
}
} catch (Throwable e) {
LOG.warn("Exception while executing listener (ignored)", e);
}
}
newHistory.put(listener, connectedEvent);
}
return newHistory;
}
private class ExpiredSessionRecovery extends Thread {
private ExpiredSessionRecovery() {
super("ZooKeeper expired session recovery thread");
}
@Override
public void run() {
LOG.info("Entering recovery mode");
synchronized (_lock) {
try {
int count = 0;
while (_state == ZKClient.State.NONE) {
try {
count++;
LOG.warn("Recovery mode: trying to reconnect to zookeeper [{}]", count);
ZKClient.this.connect();
} catch (Throwable e) {
LOG.warn("Recovery mode: reconnect attempt failed [{}]... waiting for {}", count, _reconnectTimeout, e);
try {
_lock.wait(_reconnectTimeout.getDurationInMilliseconds());
} catch (InterruptedException e1) {
throw new RuntimeException("Recovery mode: wait interrupted... bailing out", e1);
}
}
}
} finally {
_expiredSessionRecovery = null;
LOG.info("Exiting recovery mode.");
}
}
}
}
public ZKClient(String connectString, Timespan sessionTimeout, Watcher watcher) {
this(new ZooKeeperFactory(connectString, sessionTimeout, watcher));
}
public ZKClient(IZooKeeperFactory factory) {
this(factory, null);
}
public ZKClient(IZooKeeperFactory factory, String chroot) {
super(chroot);
_factory = factory;
Map<String, String> acls = new HashMap<>();
acls.put("/", "world:anyone:acdrw");
setACLs(acls);
}
static private int getPermFromString(String permString) {
int perm = 0;
for (int i = 0; i < permString.length(); i++) {
switch (permString.charAt(i)) {
case 'r':
perm |= ZooDefs.Perms.READ;
break;
case 'w':
perm |= ZooDefs.Perms.WRITE;
break;
case 'c':
perm |= ZooDefs.Perms.CREATE;
break;
case 'd':
perm |= ZooDefs.Perms.DELETE;
break;
case 'a':
perm |= ZooDefs.Perms.ADMIN;
break;
default:
System.err.println("Unknown perm type:" + permString.charAt(i));
}
}
return perm;
}
private static List<ACL> parseACLs(String aclString) {
List<ACL> acl;
String acls[] = aclString.split(",");
acl = new ArrayList<>();
for (String a : acls) {
int firstColon = a.indexOf(':');
int lastColon = a.lastIndexOf(':');
if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
System.err.println(a + " does not have the form scheme:id:perm");
continue;
}
ACL newAcl = new ACL();
newAcl.setId(new Id(a.substring(0, firstColon), a.substring(firstColon + 1, lastColon)));
newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
acl.add(newAcl);
}
return acl;
}
public Stat createOrSetByteWithParents(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
if (exists(path) != null) {
return setByteData(path, data);
}
try {
createBytesNodeWithParents(path, data, acl, createMode);
return null;
} catch (KeeperException.NodeExistsException e) {
// this should not happen very often (race condition)
return setByteData(path, data);
}
}
public String create(String path, CreateMode createMode) throws InterruptedException, KeeperException {
return create(path, (byte[]) null, createMode);
}
public String create(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
return create(path, toByteData(data), createMode);
}
public String create(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
return getZk().create(adjustPath(path), data, getNodeACLs(path), createMode);
}
public String createWithParents(String path, CreateMode createMode) throws InterruptedException, KeeperException {
return createWithParents(path, (byte[]) null, createMode);
}
public String createWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
return createWithParents(path, toByteData(data), createMode);
}
public String createWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
createParents(path);
return create(path, data, createMode);
}
public Stat createOrSetWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
return createOrSetWithParents(path, toByteData(data), createMode);
}
public Stat createOrSetWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
if (exists(path) != null) {
return setByteData(path, data);
}
try {
createWithParents(path, data, createMode);
return null;
} catch (KeeperException.NodeExistsException e) {
// this should not happen very often (race condition)
return setByteData(path, data);
}
}
public void fixACLs(String path, boolean recursive) throws InterruptedException, KeeperException {
if (exists(path) != null) {
doFixACLs(path, recursive);
}
}
private void doFixACLs(String path, boolean recursive) throws KeeperException, InterruptedException {
setACL(path, getNodeACLs(path), -1);
if (recursive) {
for (String child : getChildren(path)) {
doFixACLs(path.equals("/") ? "/" + child : path + "/" + child, recursive);
}
}
}
private List<ACL> getNodeACLs(String path) {
String acl = doGetNodeACLs(adjustPath(path));
if (acl == null) {
throw new IllegalStateException("Could not find matching ACLs for " + path);
}
return parseACLs(acl);
}
protected String doGetNodeACLs(String path) {
String longestPath = "";
for (String acl : acls.keySet()) {
if (acl.length() > longestPath.length() && path.startsWith(acl)) {
longestPath = acl;
}
}
return acls.get(longestPath);
}
private void createParents(String path) throws InterruptedException, KeeperException {
path = PathUtils.getParentPath(adjustPath(path));
path = PathUtils.removeTrailingSlash(path);
List<String> paths = new ArrayList<>();
while (!path.equals("") && getZk().exists(path, false) == null) {
paths.add(path);
path = PathUtils.getParentPath(path);
path = PathUtils.removeTrailingSlash(path);
}
Collections.reverse(paths);
for (String p : paths) {
try {
getZk().create(p,
null,
getNodeACLs(p),
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
// ok we continue...
if (LOG.isDebugEnabled()) {
LOG.debug("parent already exists " + p);
}
}
}
}
private byte[] toByteData(String data) {
if (data == null) {
return null;
} else {
try {
return data.getBytes(CHARSET);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
}
}