blob: ca40e161e34d7a260e8f0dcf94b5d3810f1f2edf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.zk;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.DefaultWatcherCallBack;
import com.alibaba.jstorm.callback.WatcherCallBack;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
/**
* ZK simple wrapper
*
* @author yannian
*
*/
public class Zookeeper {
private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class);
public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root) {
return mkClient(conf, servers, port, root, new DefaultWatcherCallBack());
}
/**
* connect ZK, register Watch/unhandle Watch
*
* @return
*/
public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) {
CuratorFramework fk = Utils.newCurator(conf, servers, port, root);
fk.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception {
if (e.getType().equals(CuratorEventType.WATCHED)) {
WatchedEvent event = e.getWatchedEvent();
watcher.execute(event.getState(), event.getType(), event.getPath());
}
}
});
fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
@Override
public void unhandledError(String msg, Throwable error) {
String errmsg = "Unrecoverable Zookeeper error, halting process: " + msg;
LOG.error(errmsg, error);
JStormUtils.halt_process(1, "Unrecoverable Zookeeper error");
}
});
fk.start();
return fk;
}
public String createNode(CuratorFramework zk, String path, byte[] data, org.apache.zookeeper.CreateMode mode) throws Exception {
String npath = PathUtils.normalize_path(path);
return zk.create().withMode(mode).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(npath, data);
}
public String createNode(CuratorFramework zk, String path, byte[] data) throws Exception {
return createNode(zk, path, data, org.apache.zookeeper.CreateMode.PERSISTENT);
}
public boolean existsNode(CuratorFramework zk, String path, boolean watch) throws Exception {
Stat stat = null;
if (watch) {
stat = zk.checkExists().watched().forPath(PathUtils.normalize_path(path));
} else {
stat = zk.checkExists().forPath(PathUtils.normalize_path(path));
}
return stat != null;
}
public void deleteNode(CuratorFramework zk, String path) throws Exception {
zk.delete().forPath(PathUtils.normalize_path(path));
}
public void mkdirs(CuratorFramework zk, String path) throws Exception {
String npath = PathUtils.normalize_path(path);
// the node is "/"
if (npath.equals("/")) {
return;
}
// the node exist
if (existsNode(zk, npath, false)) {
return;
}
mkdirs(zk, PathUtils.parent_path(npath));
try {
createNode(zk, npath, JStormUtils.barr((byte) 7), org.apache.zookeeper.CreateMode.PERSISTENT);
} catch (KeeperException e) {
;// this can happen when multiple clients doing mkdir at same
// time
LOG.warn("zookeeper mkdirs for path" + path, e);
}
}
public byte[] getData(CuratorFramework zk, String path, boolean watch) throws Exception {
String npath = PathUtils.normalize_path(path);
try {
if (existsNode(zk, npath, watch)) {
if (watch) {
return zk.getData().watched().forPath(npath);
} else {
return zk.getData().forPath(npath);
}
}
} catch (KeeperException e) {
LOG.error("zookeeper getdata for path" + path, e);
}
return null;
}
public List<String> getChildren(CuratorFramework zk, String path, boolean watch) throws Exception {
String npath = PathUtils.normalize_path(path);
if (watch) {
return zk.getChildren().watched().forPath(npath);
} else {
return zk.getChildren().forPath(npath);
}
}
public Stat setData(CuratorFramework zk, String path, byte[] data) throws Exception {
String npath = PathUtils.normalize_path(path);
return zk.setData().forPath(npath, data);
}
public boolean exists(CuratorFramework zk, String path, boolean watch) throws Exception {
return existsNode(zk, path, watch);
}
public void deletereRcursive(CuratorFramework zk, String path) throws Exception {
String npath = PathUtils.normalize_path(path);
if (existsNode(zk, npath, false)) {
zk.delete().guaranteed().deletingChildrenIfNeeded().forPath(npath);
}
}
public static Factory mkInprocessZookeeper(String localdir, int port) throws IOException, InterruptedException {
LOG.info("Starting inprocess zookeeper at port " + port + " and dir " + localdir);
File localfile = new File(localdir);
ZooKeeperServer zk = new ZooKeeperServer(localfile, localfile, 2000);
Factory factory = new Factory(new InetSocketAddress(port), 0);
factory.startup(zk);
return factory;
}
public void shutdownInprocessZookeeper(Factory handle) {
handle.shutdown();
}
}