blob: e337b1f5c69e743b03269742709409d5b2adca1b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.cluster;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.*;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.storm.Config;
import org.apache.storm.callback.DefaultWatcherCallBack;
import org.apache.storm.callback.WatcherCallBack;
import org.apache.storm.callback.ZKStateChangedCallback;
import org.apache.storm.utils.Utils;
import org.apache.storm.zookeeper.Zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class ZKStateStorage implements IStateStorage {
private static Logger LOG = LoggerFactory.getLogger(ZKStateStorage.class);
private ConcurrentHashMap<String, ZKStateChangedCallback> callbacks = new ConcurrentHashMap<String, ZKStateChangedCallback>();
private CuratorFramework zkWriter;
private CuratorFramework zkReader;
private AtomicBoolean active;
private boolean isNimbus;
private Map authConf;
private Map<Object, Object> conf;
private class ZkWatcherCallBack implements WatcherCallBack{
@Override
public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
if (active.get()) {
if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
LOG.debug("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path);
} else {
LOG.debug("Received event {} : {} : {}", state, type, path);
}
if (!type.equals(Watcher.Event.EventType.None)) {
for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) {
ZKStateChangedCallback fn = e.getValue();
fn.changed(type, path);
}
}
}
}
}
public ZKStateStorage(Map<Object, Object> conf, Map authConf, List<ACL> acls, ClusterStateContext context) throws Exception {
this.conf = conf;
this.authConf = authConf;
if (context.getDaemonType().equals(DaemonType.NIMBUS))
this.isNimbus = true;
// just mkdir STORM_ZOOKEEPER_ROOT dir
CuratorFramework zkTemp = mkZk();
String rootPath = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
Zookeeper.mkdirs(zkTemp, rootPath, acls);
zkTemp.close();
active = new AtomicBoolean(true);
zkWriter = mkZk(new ZkWatcherCallBack());
if (isNimbus) {
zkReader = mkZk(new ZkWatcherCallBack());
} else {
zkReader = zkWriter;
}
}
@SuppressWarnings("unchecked")
private CuratorFramework mkZk() throws IOException {
return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), "",
new DefaultWatcherCallBack(), authConf);
}
@SuppressWarnings("unchecked")
private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException {
return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT),
String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf);
}
@Override
public void delete_node_blobstore(String path, String nimbusHostPortInfo) {
Zookeeper.deleteNodeBlobstore(zkWriter, path, nimbusHostPortInfo);
}
@Override
public String register(ZKStateChangedCallback callback) {
String id = UUID.randomUUID().toString();
this.callbacks.put(id, callback);
return id;
}
@Override
public void unregister(String id) {
this.callbacks.remove(id);
}
@Override
public String create_sequential(String path, byte[] data, List<ACL> acls) {
return Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL_SEQUENTIAL, acls);
}
@Override
public void mkdirs(String path, List<ACL> acls) {
Zookeeper.mkdirs(zkWriter, path, acls);
}
@Override
public void delete_node(String path) {
Zookeeper.deleteNode(zkWriter, path);
}
@Override
public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
Zookeeper.mkdirs(zkWriter, Zookeeper.parentPath(path), acls);
if (Zookeeper.exists(zkWriter, path, false)) {
try {
Zookeeper.setData(zkWriter, path, data);
} catch (RuntimeException e) {
if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
} else {
throw e;
}
}
} else {
Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
}
}
@Override
public Integer get_version(String path, boolean watch) throws Exception {
Integer ret = Zookeeper.getVersion(zkReader, path, watch);
return ret;
}
@Override
public boolean node_exists(String path, boolean watch) {
return Zookeeper.existsNode(zkReader, path, watch);
}
@Override
public List<String> get_children(String path, boolean watch) {
return Zookeeper.getChildren(zkReader, path, watch);
}
@Override
public void close() {
this.active.set(false);
zkWriter.close();
if (isNimbus) {
zkReader.close();
}
}
@Override
public void set_data(String path, byte[] data, List<ACL> acls) {
if (Zookeeper.exists(zkWriter, path, false)) {
Zookeeper.setData(zkWriter, path, data);
} else {
Zookeeper.mkdirs(zkWriter, Zookeeper.parentPath(path), acls);
try {
Zookeeper.createNode(zkWriter, path, data, CreateMode.PERSISTENT, acls);
} catch (RuntimeException e) {
if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
Zookeeper.setData(zkWriter, path, data);
} else {
throw e;
}
}
}
}
@Override
public byte[] get_data(String path, boolean watch) {
byte[] ret = null;
ret = Zookeeper.getData(zkReader, path, watch);
return ret;
}
@Override
public VersionedData<byte[]> get_data_with_version(String path, boolean watch) {
return Zookeeper.getDataWithVersion(zkReader, path, watch);
}
@Override
public void set_worker_hb(String path, byte[] data, List<ACL> acls) {
set_data(path, data, acls);
}
@Override
public byte[] get_worker_hb(String path, boolean watch) {
return Zookeeper.getData(zkReader, path, watch);
}
@Override
public List<String> get_worker_hb_children(String path, boolean watch) {
return get_children(path, watch);
}
@Override
public void delete_worker_hb(String path) {
delete_node(path);
}
@Override
public void add_listener(final ConnectionStateListener listener) {
Zookeeper.addListener(zkReader, new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
listener.stateChanged(curatorFramework, connectionState);
}
});
}
@Override
public void sync_path(String path) {
Zookeeper.syncPath(zkWriter, path);
}
}