blob: ff87ae39a2889c6a14f233c6ddbdbe2a9b9df993 [file] [log] [blame]
* Copyright 2014 Fluo authors (see AUTHORS)
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package io.fluo.core.util;
import java.util.concurrent.TimeUnit;
import io.fluo.accumulo.util.ZookeeperUtil;
import io.fluo.api.config.FluoConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CuratorUtil {
private static final Logger log = LoggerFactory.getLogger(CuratorUtil.class);
public enum NodeExistsPolicy {
private CuratorUtil() {}
* Creates a curator built using Application's zookeeper connection string. Root path will start
* at Fluo application chroot.
public static CuratorFramework newAppCurator(FluoConfiguration config) {
return newCurator(config.getAppZookeepers(), config.getZookeeperTimeout());
* Creates a curator built using Fluo's zookeeper connection string. Root path will start at Fluo
* chroot.
public static CuratorFramework newFluoCurator(FluoConfiguration config) {
return newCurator(config.getInstanceZookeepers(), config.getZookeeperTimeout());
* Creates a curator built using Fluo's zookeeper connection string. Root path will start at root
* "/" of Zookeeper.
public static CuratorFramework newRootFluoCurator(FluoConfiguration config) {
return newCurator(ZookeeperUtil.parseServers(config.getInstanceZookeepers()),
* Creates a curator built using the given zookeeper connection string and timeout
public static CuratorFramework newCurator(String zookeepers, int timeout) {
return CuratorFrameworkFactory.newClient(zookeepers, timeout, timeout,
new ExponentialBackoffRetry(1000, 10));
public static boolean putData(CuratorFramework curator, String zPath, byte[] data,
NodeExistsPolicy policy) throws KeeperException, InterruptedException {
if (policy == null) {
policy = NodeExistsPolicy.FAIL;
while (true) {
try {
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(zPath, data);
return true;
} catch (Exception nee) {
if (nee instanceof KeeperException.NodeExistsException) {
switch (policy) {
case SKIP:
return false;
try {
curator.setData().withVersion(-1).forPath(zPath, data);
return true;
} catch (Exception nne) {
if (nne instanceof KeeperException.NoNodeException) {
// node delete between create call and set data, so try create call again
} else {
throw new RuntimeException(nne);
throw (KeeperException.NodeExistsException) nee;
} else {
throw new RuntimeException(nee);
* Starts the ephemeral node and waits for it to be created
* @param node Node to start
* @param maxWaitSec Maximum time in seconds to wait
public static void startAndWait(PersistentEphemeralNode node, int maxWaitSec) {
int waitTime = 0;
try {
while (node.waitForInitialCreate(1, TimeUnit.SECONDS) == false) {
waitTime += 1;"Waited " + waitTime + " sec for ephemeral node to be created");
if (waitTime > maxWaitSec) {
throw new IllegalStateException("Failed to create ephemeral node");
} catch (InterruptedException e) {
throw new IllegalStateException(e);