blob: de18241a392f5a68dfca57862b8d846cf25d1a56 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.hbase.snapshot;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hcatalog.hbase.snapshot.transaction.thrift.StoreFamilyRevision;
import org.apache.hcatalog.hbase.snapshot.transaction.thrift.StoreFamilyRevisionList;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ZKUtil {
private int DEFAULT_SESSION_TIMEOUT = 1000000;
private ZooKeeper zkSession;
private String baseDir;
private String connectString;
private static final Logger LOG = LoggerFactory.getLogger(ZKUtil.class);
static enum UpdateMode {
APPEND, REMOVE, KEEP_ALIVE
}
;
ZKUtil(String connection, String baseDir) {
this.connectString = connection;
this.baseDir = baseDir;
}
/**
* This method creates znodes related to table.
*
* @param table The name of the table.
* @param families The list of column families of the table.
* @throws IOException
*/
void setUpZnodesForTable(String table, List<String> families)
throws IOException {
String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir, table);
ensurePathExists(transactionDataTablePath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
for (String cf : families) {
String runningDataPath = PathUtil.getRunningTxnInfoPath(
this.baseDir, table, cf);
ensurePathExists(runningDataPath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
String abortDataPath = PathUtil.getAbortInformationPath(
this.baseDir, table, cf);
ensurePathExists(abortDataPath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
}
/**
* This method ensures that a given path exists in zookeeper. If the path
* does not exists, it creates one.
*
* @param path The path of znode that is required to exist.
* @param data The data to be associated with the znode.
* @param acl The ACLs required.
* @param flags The CreateMode for the znode.
* @throws IOException
*/
void ensurePathExists(String path, byte[] data, List<ACL> acl,
CreateMode flags) throws IOException {
String[] dirs = path.split("/");
String parentPath = "";
for (String subDir : dirs) {
if (subDir.equals("") == false) {
parentPath = parentPath + "/" + subDir;
try {
Stat stat = getSession().exists(parentPath, false);
if (stat == null) {
getSession().create(parentPath, data, acl, flags);
}
} catch (Exception e) {
throw new IOException("Exception while creating path "
+ parentPath, e);
}
}
}
}
/**
* This method returns a list of columns of a table which were used in any
* of the transactions.
*
* @param tableName The name of table.
* @return List<String> The list of column families in table.
* @throws IOException
*/
List<String> getColumnFamiliesOfTable(String tableName) throws IOException {
String path = PathUtil.getTxnDataPath(baseDir, tableName);
List<String> children = null;
List<String> columnFamlies = new ArrayList<String>();
try {
children = getSession().getChildren(path, false);
} catch (KeeperException e) {
LOG.warn("Caught: ", e);
throw new IOException("Exception while obtaining columns of table.", e);
} catch (InterruptedException e) {
LOG.warn("Caught: ", e);
throw new IOException("Exception while obtaining columns of table.", e);
}
for (String child : children) {
if ((child.contains("idgen") == false)
&& (child.contains("_locknode_") == false)) {
columnFamlies.add(child);
}
}
return columnFamlies;
}
/**
* This method returns a time stamp for use by the transactions.
*
* @return long The current timestamp in zookeeper.
* @throws IOException
*/
long getTimeStamp() throws IOException {
long timeStamp;
Stat stat;
String clockPath = PathUtil.getClockPath(this.baseDir);
ensurePathExists(clockPath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
try {
getSession().exists(clockPath, false);
stat = getSession().setData(clockPath, null, -1);
} catch (KeeperException e) {
LOG.warn("Caught: ", e);
throw new IOException("Exception while obtaining timestamp ", e);
} catch (InterruptedException e) {
LOG.warn("Caught: ", e);
throw new IOException("Exception while obtaining timestamp ", e);
}
timeStamp = stat.getMtime();
return timeStamp;
}
/**
* This method returns the next revision number to be used for any
* transaction purposes.
*
* @param tableName The name of the table.
* @return revision number The revision number last used by any transaction.
* @throws IOException
*/
long nextId(String tableName) throws IOException {
String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
String lockNode = PathUtil.getLockManagementNode(idNode);
ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
long id = idf.obtainID();
return id;
}
/**
* The latest used revision id of the table.
*
* @param tableName The name of the table.
* @return the long The revision number to use by any transaction.
* @throws IOException Signals that an I/O exception has occurred.
*/
long currentID(String tableName) throws IOException {
String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
String lockNode = PathUtil.getLockManagementNode(idNode);
ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
long id = idf.readID();
return id;
}
/**
* This methods retrieves the list of transaction information associated
* with each column/column family of a table.
*
* @param path The znode path
* @return List of FamilyRevision The list of transactions in the given path.
* @throws IOException
*/
List<FamilyRevision> getTransactionList(String path)
throws IOException {
byte[] data = getRawData(path, new Stat());
ArrayList<FamilyRevision> wtxnList = new ArrayList<FamilyRevision>();
if (data == null) {
return wtxnList;
}
StoreFamilyRevisionList txnList = new StoreFamilyRevisionList();
deserialize(txnList, data);
Iterator<StoreFamilyRevision> itr = txnList.getRevisionListIterator();
while (itr.hasNext()) {
StoreFamilyRevision wtxn = itr.next();
wtxnList.add(new FamilyRevision(wtxn.getRevision(), wtxn
.getTimestamp()));
}
return wtxnList;
}
/**
* This method returns the data associated with the path in zookeeper.
*
* @param path The znode path
* @param stat Zookeeper stat
* @return byte array The data stored in the znode.
* @throws IOException
*/
byte[] getRawData(String path, Stat stat) throws IOException {
byte[] data = null;
try {
data = getSession().getData(path, false, stat);
} catch (Exception e) {
throw new IOException(
"Exception while obtaining raw data from zookeeper path "
+ path, e);
}
return data;
}
/**
* This method created the basic znodes in zookeeper for revision
* management.
*
* @throws IOException
*/
void createRootZNodes() throws IOException {
String txnBaseNode = PathUtil.getTransactionBasePath(this.baseDir);
String clockNode = PathUtil.getClockPath(this.baseDir);
ensurePathExists(txnBaseNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
ensurePathExists(clockNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
/**
* This method closes the zookeeper session.
*/
void closeZKConnection() {
if (zkSession != null) {
try {
zkSession.close();
} catch (InterruptedException e) {
LOG.warn("Close failed: ", e);
}
zkSession = null;
LOG.info("Disconnected to ZooKeeper");
}
}
/**
* This method returns a zookeeper session. If the current session is closed,
* then a new session is created.
*
* @return ZooKeeper An instance of zookeeper client.
* @throws IOException
*/
ZooKeeper getSession() throws IOException {
if (zkSession == null || zkSession.getState() == States.CLOSED) {
synchronized (this) {
if (zkSession == null || zkSession.getState() == States.CLOSED) {
zkSession = new ZooKeeper(this.connectString,
this.DEFAULT_SESSION_TIMEOUT, new ZKWatcher());
}
}
}
return zkSession;
}
/**
* This method updates the transaction data related to a znode.
*
* @param path The path to the transaction data.
* @param updateTx The FamilyRevision to be updated.
* @param mode The mode to update like append, update, remove.
* @throws IOException
*/
void updateData(String path, FamilyRevision updateTx, UpdateMode mode)
throws IOException {
if (updateTx == null) {
throw new IOException(
"The transaction to be updated found to be null.");
}
List<FamilyRevision> currentData = getTransactionList(path);
List<FamilyRevision> newData = new ArrayList<FamilyRevision>();
boolean dataFound = false;
long updateVersion = updateTx.getRevision();
for (FamilyRevision tranx : currentData) {
if (tranx.getRevision() != updateVersion) {
newData.add(tranx);
} else {
dataFound = true;
}
}
switch (mode) {
case REMOVE:
if (dataFound == false) {
throw new IOException(
"The transaction to be removed not found in the data.");
}
LOG.info("Removed trasaction : " + updateTx.toString());
break;
case KEEP_ALIVE:
if (dataFound == false) {
throw new IOException(
"The transaction to be kept alove not found in the data. It might have been expired.");
}
newData.add(updateTx);
LOG.info("keep alive of transaction : " + updateTx.toString());
break;
case APPEND:
if (dataFound == true) {
throw new IOException(
"The data to be appended already exists.");
}
newData.add(updateTx);
LOG.info("Added transaction : " + updateTx.toString());
break;
}
// For serialization purposes.
List<StoreFamilyRevision> newTxnList = new ArrayList<StoreFamilyRevision>();
for (FamilyRevision wtxn : newData) {
StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
wtxn.getExpireTimestamp());
newTxnList.add(newTxn);
}
StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
byte[] newByteData = serialize(wtxnList);
Stat stat = null;
try {
stat = zkSession.setData(path, newByteData, -1);
} catch (KeeperException e) {
throw new IOException(
"Exception while updating trasactional data. ", e);
} catch (InterruptedException e) {
throw new IOException(
"Exception while updating trasactional data. ", e);
}
if (stat != null) {
LOG.info("Transaction list stored at " + path + ".");
}
}
/**
* Refresh transactions on a given transaction data path.
*
* @param path The path to the transaction data.
* @throws IOException Signals that an I/O exception has occurred.
*/
void refreshTransactions(String path) throws IOException {
List<FamilyRevision> currentData = getTransactionList(path);
List<FamilyRevision> newData = new ArrayList<FamilyRevision>();
for (FamilyRevision tranx : currentData) {
if (tranx.getExpireTimestamp() > getTimeStamp()) {
newData.add(tranx);
}
}
if (newData.equals(currentData) == false) {
List<StoreFamilyRevision> newTxnList = new ArrayList<StoreFamilyRevision>();
for (FamilyRevision wtxn : newData) {
StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
wtxn.getExpireTimestamp());
newTxnList.add(newTxn);
}
StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
byte[] newByteData = serialize(wtxnList);
try {
zkSession.setData(path, newByteData, -1);
} catch (KeeperException e) {
throw new IOException(
"Exception while updating trasactional data. ", e);
} catch (InterruptedException e) {
throw new IOException(
"Exception while updating trasactional data. ", e);
}
}
}
/**
* Delete table znodes.
*
* @param tableName the hbase table name
* @throws IOException Signals that an I/O exception has occurred.
*/
void deleteZNodes(String tableName) throws IOException {
String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir,
tableName);
deleteRecursively(transactionDataTablePath);
}
void deleteRecursively(String path) throws IOException {
try {
List<String> children = getSession().getChildren(path, false);
if (children.size() != 0) {
for (String child : children) {
deleteRecursively(path + "/" + child);
}
}
getSession().delete(path, -1);
} catch (KeeperException e) {
throw new IOException(
"Exception while deleting path " + path + ".", e);
} catch (InterruptedException e) {
throw new IOException(
"Exception while deleting path " + path + ".", e);
}
}
/**
* This method serializes a given instance of TBase object.
*
* @param obj An instance of TBase
* @return byte array The serialized data.
* @throws IOException
*/
static byte[] serialize(TBase obj) throws IOException {
if (obj == null)
return new byte[0];
try {
TSerializer serializer = new TSerializer(
new TBinaryProtocol.Factory());
byte[] bytes = serializer.serialize(obj);
return bytes;
} catch (Exception e) {
throw new IOException("Serialization error: ", e);
}
}
/**
* This method deserializes the given byte array into the TBase object.
*
* @param obj An instance of TBase
* @param data Output of deserialization.
* @throws IOException
*/
static void deserialize(TBase obj, byte[] data) throws IOException {
if (data == null || data.length == 0)
return;
try {
TDeserializer deserializer = new TDeserializer(
new TBinaryProtocol.Factory());
deserializer.deserialize(obj, data);
} catch (Exception e) {
throw new IOException("Deserialization error: " + e.getMessage(), e);
}
}
private class ZKWatcher implements Watcher {
public void process(WatchedEvent event) {
switch (event.getState()) {
case Expired:
LOG.info("The client session has expired. Try opening a new "
+ "session and connecting again.");
zkSession = null;
break;
default:
}
}
}
}