blob: 62701231086b2272bac65a8b9004e1fa83de758c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.core.zk;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
public class ZKIntegration implements Watcher {
/**
* Base path for services
*/
public static String ZK_SERVICES = "services";
/**
* Base path for all Slider references
*/
public static String ZK_SLIDER = "slider";
public static String ZK_USERS = "users";
public static String SVC_SLIDER = "/" + ZK_SERVICES + "/" + ZK_SLIDER;
public static String SVC_SLIDER_USERS = SVC_SLIDER + "/" + ZK_USERS;
public static final List<String> ZK_USERS_PATH_LIST = new ArrayList<>();
static {
ZK_USERS_PATH_LIST.add(ZK_SERVICES);
ZK_USERS_PATH_LIST.add(ZK_SLIDER);
ZK_USERS_PATH_LIST.add(ZK_USERS);
}
public static int SESSION_TIMEOUT = 5000;
protected static final Logger log =
LoggerFactory.getLogger(ZKIntegration.class);
private ZooKeeper zookeeper;
private final String username;
private final String clustername;
private final String userPath;
private int sessionTimeout = SESSION_TIMEOUT;
/**
flag to set to indicate that the user path should be created if
it is not already there
*/
private final AtomicBoolean toInit = new AtomicBoolean(false);
private final boolean createClusterPath;
private final Watcher watchEventHandler;
private final String zkConnection;
private final boolean canBeReadOnly;
protected ZKIntegration(String zkConnection,
String username,
String clustername,
boolean canBeReadOnly,
boolean createClusterPath,
Watcher watchEventHandler
) throws IOException {
this.username = username;
this.clustername = clustername;
this.watchEventHandler = watchEventHandler;
this.zkConnection = zkConnection;
this.canBeReadOnly = canBeReadOnly;
this.createClusterPath = createClusterPath;
this.userPath = mkSliderUserPath(username);
}
public void init() throws IOException {
assert zookeeper == null;
log.debug("Binding ZK client to {}", zkConnection);
zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, canBeReadOnly);
}
/**
* Create an instance bonded to the specific closure
* @param zkConnection
* @param username
* @param clustername
* @param canBeReadOnly
* @param watchEventHandler
* @return
* @throws IOException
*/
public static ZKIntegration newInstance(String zkConnection, String username, String clustername, boolean createClusterPath, boolean canBeReadOnly, Watcher watchEventHandler) throws IOException {
return new ZKIntegration(zkConnection,
username,
clustername,
canBeReadOnly,
createClusterPath,
watchEventHandler);
}
public String getConnectionString() {
return zkConnection;
}
public String getClusterPath() {
return mkClusterPath(username, clustername);
}
public boolean getConnected() {
return zookeeper.getState().isConnected();
}
public boolean getAlive() {
return zookeeper.getState().isAlive();
}
public ZooKeeper.States getState() {
return zookeeper.getState();
}
public Stat getClusterStat() throws KeeperException, InterruptedException {
return stat(getClusterPath());
}
public boolean exists(String path) throws
KeeperException,
InterruptedException {
return stat(path) != null;
}
public Stat stat(String path) throws KeeperException, InterruptedException {
return zookeeper.exists(path, false);
}
@Override
public String toString() {
return "ZK integration bound @ " + zkConnection + ": " + zookeeper;
}
/**
* Event handler to notify of state events
* @param event
*/
@Override
public void process(WatchedEvent event) {
log.debug("{}", event);
try {
maybeInit();
} catch (Exception e) {
log.error("Failed to init", e);
}
if (watchEventHandler != null) {
watchEventHandler.process(event);
}
}
private void maybeInit() throws KeeperException, InterruptedException {
if (!toInit.getAndSet(true) && createClusterPath) {
log.debug("initing");
//create the user path
mkPath(ZK_USERS_PATH_LIST, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//create the specific user
createPath(userPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* Create a path under a parent, don't care if it already exists
* As the path isn't returned, this isn't the way to create sequentially
* numbered nodes.
* @param parent parent dir. Must have a trailing / if entry!=null||empty
* @param entry entry -can be null or "", in which case it is not appended
* @param acl
* @param createMode
* @return the path if created; null if not
*/
public String createPath(String parent,
String entry,
List<ACL> acl,
CreateMode createMode) throws KeeperException, InterruptedException {
//initial create of full path
assert acl != null;
assert !acl.isEmpty();
assert parent != null;
String path = parent;
if (entry != null) {
path = path + entry;
}
try {
log.debug("Creating ZK path {}", path);
return zookeeper.create(path, null, acl, createMode);
} catch (KeeperException.NodeExistsException ignored) {
//node already there
log.debug("node already present:{}",path);
return null;
}
}
/**
* Recursive path create
* @param path
* @param data
* @param acl
* @param createMode
*/
public void mkPath(List<String> paths,
List<ACL> acl,
CreateMode createMode) throws KeeperException, InterruptedException {
String history = "/";
for (String entry : paths) {
createPath(history, entry, acl, createMode);
history = history + entry + "/";
}
}
/**
* Blocking enum of users
* @return an unordered list of clusters under a user
*/
public List<String> getClusters() throws KeeperException, InterruptedException {
return zookeeper.getChildren(userPath, null);
}
/**
* Delete a node, does not throw an exception if the path is not fond
* @param path path to delete
* @return true if the path could be deleted, false if there was no node to delete
*
*/
public boolean delete(String path) throws
InterruptedException,
KeeperException {
try {
zookeeper.delete(path, -1);
return true;
} catch (KeeperException.NoNodeException ignored) {
return false;
}
}
/**
* Build the path to a cluster; exists once the cluster has come up.
* Even before that, a ZK watcher could wait for it.
* @param username user
* @param clustername name of the cluster
* @return a strin
*/
public static String mkClusterPath(String username, String clustername) {
return mkSliderUserPath(username) + "/" + clustername;
}
/**
* Build the path to a cluster; exists once the cluster has come up.
* Even before that, a ZK watcher could wait for it.
* @param username user
* @return a string
*/
public static String mkSliderUserPath(String username) {
return SVC_SLIDER_USERS + "/" + username;
}
}