blob: 50adc9f509055be92efbc6ada4f7390685b0782a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.hbase.snapshot;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hcatalog.hbase.snapshot.lock.LockListener;
import org.apache.hcatalog.hbase.snapshot.lock.WriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The service for providing revision management to Hbase tables.
*/
public class ZKBasedRevisionManager implements RevisionManager {
private static final Logger LOG = LoggerFactory.getLogger(ZKBasedRevisionManager.class);
private String zkHostList;
private String baseDir;
private ZKUtil zkUtil;
private long writeTxnTimeout;
/*
* @see org.apache.hcatalog.hbase.snapshot.RevisionManager#initialize()
*/
@Override
public void initialize(Configuration conf) {
conf = new Configuration(conf);
if (conf.get(RMConstants.ZOOKEEPER_HOSTLIST) == null) {
String zkHostList = conf.get(HConstants.ZOOKEEPER_QUORUM);
int port = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
String[] splits = zkHostList.split(",");
StringBuffer sb = new StringBuffer();
for (String split : splits) {
sb.append(split);
sb.append(':');
sb.append(port);
sb.append(',');
}
sb.deleteCharAt(sb.length() - 1);
conf.set(RMConstants.ZOOKEEPER_HOSTLIST, sb.toString());
}
this.zkHostList = conf.get(RMConstants.ZOOKEEPER_HOSTLIST);
this.baseDir = conf.get(RMConstants.ZOOKEEPER_DATADIR);
this.writeTxnTimeout = Long.parseLong(conf.get(RMConstants.WRITE_TRANSACTION_TIMEOUT));
}
/**
* Open a ZooKeeper connection
* @throws java.io.IOException
*/
public void open() throws IOException {
zkUtil = new ZKUtil(zkHostList, this.baseDir);
zkUtil.createRootZNodes();
LOG.info("Created root znodes for revision manager.");
}
/**
* Close Zookeeper connection
*/
public void close() {
zkUtil.closeZKConnection();
}
private void checkInputParams(String table, List<String> families) {
if (table == null) {
throw new IllegalArgumentException(
"The table name must be specified for reading.");
}
if (families == null || families.isEmpty()) {
throw new IllegalArgumentException(
"At least one column family should be specified for reading.");
}
}
@Override
public void createTable(String table, List<String> columnFamilies) throws IOException {
zkUtil.createRootZNodes();
zkUtil.setUpZnodesForTable(table, columnFamilies);
}
@Override
public void dropTable(String table) throws IOException {
zkUtil.deleteZNodes(table);
}
/* @param table
/* @param families
/* @param keepAlive
/* @return
/* @throws IOException
* @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List, long)
*/
public Transaction beginWriteTransaction(String table,
List<String> families, long keepAlive) throws IOException {
checkInputParams(table, families);
zkUtil.setUpZnodesForTable(table, families);
long nextId = zkUtil.nextId(table);
long expireTimestamp = zkUtil.getTimeStamp();
Transaction transaction = new Transaction(table, families, nextId,
expireTimestamp);
if (keepAlive != -1) {
transaction.setKeepAlive(keepAlive);
} else {
transaction.setKeepAlive(writeTxnTimeout);
}
refreshTransactionList(transaction.getTableName());
String lockPath = prepareLockNode(table);
WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
Ids.OPEN_ACL_UNSAFE);
RMLockListener myLockListener = new RMLockListener();
wLock.setLockListener(myLockListener);
try {
boolean lockGrabbed = wLock.lock();
if (lockGrabbed == false) {
//TO DO : Let this request queue up and try obtaining lock.
throw new IOException(
"Unable to obtain lock while beginning transaction. "
+ transaction.toString());
} else {
List<String> colFamilies = transaction.getColumnFamilies();
FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
for (String cfamily : colFamilies) {
String path = PathUtil.getRunningTxnInfoPath(
baseDir, table, cfamily);
zkUtil.updateData(path, revisionData,
ZKUtil.UpdateMode.APPEND);
}
}
} catch (KeeperException e) {
throw new IOException("Exception while obtaining lock.", e);
} catch (InterruptedException e) {
throw new IOException("Exception while obtaining lock.", e);
} finally {
wLock.unlock();
}
return transaction;
}
/* @param table The table name.
/* @param families The column families involved in the transaction.
/* @return transaction The transaction which was started.
/* @throws IOException
* @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List)
*/
public Transaction beginWriteTransaction(String table, List<String> families)
throws IOException {
return beginWriteTransaction(table, families, -1);
}
/**
* This method commits a write transaction.
* @param transaction The revision information associated with transaction.
* @throws java.io.IOException
*/
public void commitWriteTransaction(Transaction transaction) throws IOException {
refreshTransactionList(transaction.getTableName());
String lockPath = prepareLockNode(transaction.getTableName());
WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
Ids.OPEN_ACL_UNSAFE);
RMLockListener myLockListener = new RMLockListener();
wLock.setLockListener(myLockListener);
try {
boolean lockGrabbed = wLock.lock();
if (lockGrabbed == false) {
//TO DO : Let this request queue up and try obtaining lock.
throw new IOException(
"Unable to obtain lock while commiting transaction. "
+ transaction.toString());
} else {
String tableName = transaction.getTableName();
List<String> colFamilies = transaction.getColumnFamilies();
FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
for (String cfamily : colFamilies) {
String path = PathUtil.getRunningTxnInfoPath(
baseDir, tableName, cfamily);
zkUtil.updateData(path, revisionData,
ZKUtil.UpdateMode.REMOVE);
}
}
} catch (KeeperException e) {
throw new IOException("Exception while obtaining lock.", e);
} catch (InterruptedException e) {
throw new IOException("Exception while obtaining lock.", e);
} finally {
wLock.unlock();
}
LOG.info("Write Transaction committed: " + transaction.toString());
}
/**
* This method aborts a write transaction.
* @param transaction
* @throws java.io.IOException
*/
public void abortWriteTransaction(Transaction transaction) throws IOException {
refreshTransactionList(transaction.getTableName());
String lockPath = prepareLockNode(transaction.getTableName());
WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
Ids.OPEN_ACL_UNSAFE);
RMLockListener myLockListener = new RMLockListener();
wLock.setLockListener(myLockListener);
try {
boolean lockGrabbed = wLock.lock();
if (lockGrabbed == false) {
//TO DO : Let this request queue up and try obtaining lock.
throw new IOException(
"Unable to obtain lock while aborting transaction. "
+ transaction.toString());
} else {
String tableName = transaction.getTableName();
List<String> colFamilies = transaction.getColumnFamilies();
FamilyRevision revisionData = transaction
.getFamilyRevisionInfo();
for (String cfamily : colFamilies) {
String path = PathUtil.getRunningTxnInfoPath(
baseDir, tableName, cfamily);
zkUtil.updateData(path, revisionData,
ZKUtil.UpdateMode.REMOVE);
path = PathUtil.getAbortInformationPath(baseDir,
tableName, cfamily);
zkUtil.updateData(path, revisionData,
ZKUtil.UpdateMode.APPEND);
}
}
} catch (KeeperException e) {
throw new IOException("Exception while obtaining lock.", e);
} catch (InterruptedException e) {
throw new IOException("Exception while obtaining lock.", e);
} finally {
wLock.unlock();
}
LOG.info("Write Transaction aborted: " + transaction.toString());
}
/* @param transaction
/* @throws IOException
* @see org.apache.hcatalog.hbase.snapshot.RevsionManager#keepAlive(org.apache.hcatalog.hbase.snapshot.Transaction)
*/
public void keepAlive(Transaction transaction)
throws IOException {
refreshTransactionList(transaction.getTableName());
transaction.keepAliveTransaction();
String lockPath = prepareLockNode(transaction.getTableName());
WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
Ids.OPEN_ACL_UNSAFE);
RMLockListener myLockListener = new RMLockListener();
wLock.setLockListener(myLockListener);
try {
boolean lockGrabbed = wLock.lock();
if (lockGrabbed == false) {
//TO DO : Let this request queue up and try obtaining lock.
throw new IOException(
"Unable to obtain lock for keep alive of transaction. "
+ transaction.toString());
} else {
String tableName = transaction.getTableName();
List<String> colFamilies = transaction.getColumnFamilies();
FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
for (String cfamily : colFamilies) {
String path = PathUtil.getRunningTxnInfoPath(
baseDir, tableName, cfamily);
zkUtil.updateData(path, revisionData,
ZKUtil.UpdateMode.KEEP_ALIVE);
}
}
} catch (KeeperException e) {
throw new IOException("Exception while obtaining lock.", e);
} catch (InterruptedException e) {
throw new IOException("Exception while obtaining lock.", e);
} finally {
wLock.unlock();
}
}
/* This method allows the user to create latest snapshot of a
/* table.
/* @param tableName The table whose snapshot is being created.
/* @return TableSnapshot An instance of TableSnaphot
/* @throws IOException
* @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String)
*/
public TableSnapshot createSnapshot(String tableName) throws IOException {
refreshTransactionList(tableName);
long latestID = zkUtil.currentID(tableName);
HashMap<String, Long> cfMap = new HashMap<String, Long>();
List<String> columnFamilyNames = zkUtil.getColumnFamiliesOfTable(tableName);
for (String cfName : columnFamilyNames) {
String cfPath = PathUtil.getRunningTxnInfoPath(baseDir, tableName, cfName);
List<FamilyRevision> tranxList = zkUtil.getTransactionList(cfPath);
long version;
if (!tranxList.isEmpty()) {
Collections.sort(tranxList);
// get the smallest running Transaction ID
long runningVersion = tranxList.get(0).getRevision();
version = runningVersion - 1;
} else {
version = latestID;
}
cfMap.put(cfName, version);
}
TableSnapshot snapshot = new TableSnapshot(tableName, cfMap, latestID);
LOG.debug("Created snapshot For table: " + tableName + " snapshot: " + snapshot);
return snapshot;
}
/* This method allows the user to create snapshot of a
/* table with a given revision number.
/* @param tableName
/* @param revision
/* @return TableSnapshot
/* @throws IOException
* @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String, long)
*/
public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
long currentID = zkUtil.currentID(tableName);
if (revision > currentID) {
throw new IOException(
"The revision specified in the snapshot is higher than the current revision of the table.");
}
refreshTransactionList(tableName);
HashMap<String, Long> cfMap = new HashMap<String, Long>();
List<String> columnFamilies = zkUtil.getColumnFamiliesOfTable(tableName);
for (String cf : columnFamilies) {
cfMap.put(cf, revision);
}
return new TableSnapshot(tableName, cfMap, revision);
}
/**
* Get the list of in-progress Transactions for a column family
* @param table the table name
* @param columnFamily the column family name
* @return a list of in-progress WriteTransactions
* @throws java.io.IOException
*/
List<FamilyRevision> getRunningTransactions(String table,
String columnFamily) throws IOException {
String path = PathUtil.getRunningTxnInfoPath(baseDir, table,
columnFamily);
return zkUtil.getTransactionList(path);
}
@Override
public List<FamilyRevision> getAbortedWriteTransactions(String table,
String columnFamily) throws IOException {
String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily);
return zkUtil.getTransactionList(path);
}
private void refreshTransactionList(String tableName) throws IOException {
String lockPath = prepareLockNode(tableName);
WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
Ids.OPEN_ACL_UNSAFE);
RMLockListener myLockListener = new RMLockListener();
wLock.setLockListener(myLockListener);
try {
boolean lockGrabbed = wLock.lock();
if (lockGrabbed == false) {
//TO DO : Let this request queue up and try obtaining lock.
throw new IOException(
"Unable to obtain lock while refreshing transactions of table "
+ tableName + ".");
} else {
List<String> cfPaths = zkUtil
.getColumnFamiliesOfTable(tableName);
for (String cf : cfPaths) {
String runningDataPath = PathUtil.getRunningTxnInfoPath(
baseDir, tableName, cf);
zkUtil.refreshTransactions(runningDataPath);
}
}
} catch (KeeperException e) {
throw new IOException("Exception while obtaining lock.", e);
} catch (InterruptedException e) {
throw new IOException("Exception while obtaining lock.", e);
} finally {
wLock.unlock();
}
}
private String prepareLockNode(String tableName) throws IOException {
String txnDataPath = PathUtil.getTxnDataPath(this.baseDir, tableName);
String lockPath = PathUtil.getLockManagementNode(txnDataPath);
zkUtil.ensurePathExists(lockPath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
return lockPath;
}
/*
* This class is a listener class for the locks used in revision management.
* TBD: Use the following class to signal that that the lock is actually
* been granted.
*/
class RMLockListener implements LockListener {
/*
* @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired()
*/
@Override
public void lockAcquired() {
}
/*
* @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased()
*/
@Override
public void lockReleased() {
}
}
}