blob: 6bfe460749118ccf2d727145a2088f2a48d7ecc8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.templeton.tool;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
/**
* A storage implementation based on storing everything in ZooKeeper.
* This keeps everything in a central location that is guaranteed
* to be available and accessible.
*
* Data is stored with each key/value pair being a node in ZooKeeper.
*/
public class ZooKeeperStorage implements TempletonStorage {
public static final String TRACKINGDIR = "/created";
// Locations for each of the storage types
public String storage_root = null;
public String job_path = null;
public String job_trackingpath = null;
public String overhead_path = null;
public static final String ZK_HOSTS = "templeton.zookeeper.hosts";
public static final String ZK_SESSION_TIMEOUT
= "templeton.zookeeper.session-timeout";
public static final String ENCODING = "UTF-8";
private static final Log LOG = LogFactory.getLog(ZooKeeperStorage.class);
private ZooKeeper zk;
/**
* Open a ZooKeeper connection for the JobState.
*/
public static ZooKeeper zkOpen(String zkHosts, int zkSessionTimeout)
throws IOException {
return new ZooKeeper(zkHosts,
zkSessionTimeout,
new Watcher() {
@Override
synchronized public void process(WatchedEvent event) {
}
});
}
/**
* Open a ZooKeeper connection for the JobState.
*/
public static ZooKeeper zkOpen(Configuration conf)
throws IOException {
return zkOpen(conf.get(ZK_HOSTS),
conf.getInt(ZK_SESSION_TIMEOUT, 30000));
}
public ZooKeeperStorage() {
// No-op -- this is needed to be able to instantiate the
// class from the name.
}
/**
* Close this ZK connection.
*/
public void close()
throws IOException {
if (zk != null) {
try {
zk.close();
zk = null;
} catch (InterruptedException e) {
throw new IOException("Closing ZooKeeper connection", e);
}
}
}
public void startCleanup(Configuration config) {
try {
ZooKeeperCleanup.startInstance(config);
} catch (Exception e) {
LOG.warn("Cleanup instance didn't start.");
}
}
/**
* Create a node in ZooKeeper
*/
public void create(Type type, String id)
throws IOException {
try {
String[] paths = getPaths(makeZnode(type, id));
boolean wasCreated = false;
for (String znode : paths) {
try {
zk.create(znode, new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
wasCreated = true;
} catch (KeeperException.NodeExistsException e) {
}
}
if (wasCreated) {
try {
// Really not sure if this should go here. Will have
// to see how the storage mechanism evolves.
if (type.equals(Type.JOB)) {
JobStateTracker jt = new JobStateTracker(id, zk, false,
job_trackingpath);
jt.create();
}
} catch (Exception e) {
LOG.warn("Error tracking: " + e.getMessage());
// If we couldn't create the tracker node, don't
// create the main node.
zk.delete(makeZnode(type, id), -1);
}
}
if (zk.exists(makeZnode(type, id), false) == null)
throw new IOException("Unable to create " + makeZnode(type, id));
if (wasCreated) {
try {
saveField(type, id, "created",
Long.toString(System.currentTimeMillis()));
} catch (NotFoundException nfe) {
// Wow, something's really wrong.
throw new IOException("Couldn't write to node " + id, nfe);
}
}
} catch (KeeperException e) {
throw new IOException("Creating " + id, e);
} catch (InterruptedException e) {
throw new IOException("Creating " + id, e);
}
}
/**
* Get the path based on the job type.
*
* @param type
*/
public String getPath(Type type) {
String typepath = overhead_path;
switch (type) {
case JOB:
typepath = job_path;
break;
case JOBTRACKING:
typepath = job_trackingpath;
break;
}
return typepath;
}
public static String[] getPaths(String fullpath) {
ArrayList<String> paths = new ArrayList<String>();
if (fullpath.length() < 2) {
paths.add(fullpath);
} else {
int location = 0;
while ((location = fullpath.indexOf("/", location + 1)) > 0) {
paths.add(fullpath.substring(0, location));
}
paths.add(fullpath);
}
String[] strings = new String[paths.size()];
return paths.toArray(strings);
}
/**
* A helper method that sets a field value.
* @param type
* @param id
* @param name
* @param val
* @throws KeeperException
* @throws UnsupportedEncodingException
* @throws InterruptedException
*/
private void setFieldData(Type type, String id, String name, String val)
throws KeeperException, UnsupportedEncodingException, InterruptedException {
try {
zk.create(makeFieldZnode(type, id, name),
val.getBytes(ENCODING),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
zk.setData(makeFieldZnode(type, id, name),
val.getBytes(ENCODING),
-1);
}
}
/**
* Make a ZK path to the named field.
*/
public String makeFieldZnode(Type type, String id, String name) {
return makeZnode(type, id) + "/" + name;
}
/**
* Make a ZK path to job
*/
public String makeZnode(Type type, String id) {
return getPath(type) + "/" + id;
}
@Override
public void saveField(Type type, String id, String key, String val)
throws NotFoundException {
try {
if (val != null) {
create(type, id);
setFieldData(type, id, key, val);
}
} catch (Exception e) {
throw new NotFoundException("Writing " + key + ": " + val + ", "
+ e.getMessage());
}
}
@Override
public String getField(Type type, String id, String key) {
try {
byte[] b = zk.getData(makeFieldZnode(type, id, key), false, null);
return new String(b, ENCODING);
} catch (Exception e) {
return null;
}
}
@Override
public Map<String, String> getFields(Type type, String id) {
HashMap<String, String> map = new HashMap<String, String>();
try {
for (String node : zk.getChildren(makeZnode(type, id), false)) {
byte[] b = zk.getData(makeFieldZnode(type, id, node),
false, null);
map.put(node, new String(b, ENCODING));
}
} catch (Exception e) {
return map;
}
return map;
}
@Override
public boolean delete(Type type, String id) throws NotFoundException {
try {
for (String child : zk.getChildren(makeZnode(type, id), false)) {
try {
zk.delete(makeFieldZnode(type, id, child), -1);
} catch (Exception e) {
// Other nodes may be trying to delete this at the same time,
// so just log errors and skip them.
throw new NotFoundException("Couldn't delete " +
makeFieldZnode(type, id, child));
}
}
try {
zk.delete(makeZnode(type, id), -1);
} catch (Exception e) {
// Same thing -- might be deleted by other nodes, so just go on.
throw new NotFoundException("Couldn't delete " +
makeZnode(type, id));
}
} catch (Exception e) {
// Error getting children of node -- probably node has been deleted
throw new NotFoundException("Couldn't get children of " +
makeZnode(type, id));
}
return true;
}
@Override
public List<String> getAll() {
ArrayList<String> allNodes = new ArrayList<String>();
for (Type type : Type.values()) {
allNodes.addAll(getAllForType(type));
}
return allNodes;
}
@Override
public List<String> getAllForType(Type type) {
try {
return zk.getChildren(getPath(type), false);
} catch (Exception e) {
return new ArrayList<String>();
}
}
@Override
public List<String> getAllForKey(String key, String value) {
ArrayList<String> allNodes = new ArrayList<String>();
try {
for (Type type : Type.values()) {
allNodes.addAll(getAllForTypeAndKey(type, key, value));
}
} catch (Exception e) {
LOG.info("Couldn't find children.");
}
return allNodes;
}
@Override
public List<String> getAllForTypeAndKey(Type type, String key, String value) {
ArrayList<String> allNodes = new ArrayList<String>();
try {
for (String id : zk.getChildren(getPath(type), false)) {
for (String field : zk.getChildren(id, false)) {
if (field.endsWith("/" + key)) {
byte[] b = zk.getData(field, false, null);
if (new String(b, ENCODING).equals(value)) {
allNodes.add(id);
}
}
}
}
} catch (Exception e) {
// Log and go to the next type -- this one might not exist
LOG.info("Couldn't find children of " + getPath(type));
}
return allNodes;
}
@Override
public void openStorage(Configuration config) throws IOException {
storage_root = config.get(STORAGE_ROOT);
job_path = storage_root + "/jobs";
job_trackingpath = storage_root + TRACKINGDIR;
overhead_path = storage_root + "/overhead";
if (zk == null) {
zk = zkOpen(config);
}
}
@Override
public void closeStorage() throws IOException {
close();
}
}