| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hcatalog.hbase.snapshot; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hcatalog.hbase.snapshot.lock.LockListener; |
| import org.apache.hcatalog.hbase.snapshot.lock.WriteLock; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.ZooDefs.Ids; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * The service for providing revision management to Hbase tables. |
| */ |
| public class ZKBasedRevisionManager implements RevisionManager{ |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ZKBasedRevisionManager.class); |
| private String zkHostList; |
| private String baseDir; |
| private ZKUtil zkUtil; |
| private long writeTxnTimeout; |
| |
| |
| /* |
| * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#initialize() |
| */ |
| @Override |
| public void initialize(Configuration conf) { |
| conf = new Configuration(conf); |
| if (conf.get(RMConstants.ZOOKEEPER_HOSTLIST) == null) { |
| String zkHostList = conf.get(HConstants.ZOOKEEPER_QUORUM); |
| int port = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, |
| HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); |
| String[] splits = zkHostList.split(","); |
| StringBuffer sb = new StringBuffer(); |
| for (String split : splits) { |
| sb.append(split); |
| sb.append(':'); |
| sb.append(port); |
| sb.append(','); |
| } |
| sb.deleteCharAt(sb.length() - 1); |
| conf.set(RMConstants.ZOOKEEPER_HOSTLIST, sb.toString()); |
| } |
| this.zkHostList = conf.get(RMConstants.ZOOKEEPER_HOSTLIST); |
| this.baseDir = conf.get(RMConstants.ZOOKEEPER_DATADIR); |
| this.writeTxnTimeout = Long.parseLong(conf.get(RMConstants.WRITE_TRANSACTION_TIMEOUT)); |
| } |
| |
| /** |
| * Open a ZooKeeper connection |
| * @throws java.io.IOException |
| */ |
| |
| public void open() throws IOException { |
| zkUtil = new ZKUtil(zkHostList, this.baseDir); |
| zkUtil.createRootZNodes(); |
| LOG.info("Created root znodes for revision manager."); |
| } |
| |
| /** |
| * Close Zookeeper connection |
| */ |
| public void close() { |
| zkUtil.closeZKConnection(); |
| } |
| |
| private void checkInputParams(String table, List<String> families) { |
| if (table == null) { |
| throw new IllegalArgumentException( |
| "The table name must be specified for reading."); |
| } |
| if (families == null || families.isEmpty()) { |
| throw new IllegalArgumentException( |
| "At least one column family should be specified for reading."); |
| } |
| } |
| |
| @Override |
| public void createTable(String table, List<String> columnFamilies) throws IOException { |
| zkUtil.createRootZNodes(); |
| zkUtil.setUpZnodesForTable(table, columnFamilies); |
| } |
| |
| @Override |
| public void dropTable(String table) throws IOException { |
| zkUtil.deleteZNodes(table); |
| } |
| |
| /* @param table |
| /* @param families |
| /* @param keepAlive |
| /* @return |
| /* @throws IOException |
| * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List, long) |
| */ |
| public Transaction beginWriteTransaction(String table, |
| List<String> families, long keepAlive) throws IOException { |
| |
| checkInputParams(table, families); |
| zkUtil.setUpZnodesForTable(table, families); |
| long nextId = zkUtil.nextId(table); |
| long expireTimestamp = zkUtil.getTimeStamp(); |
| Transaction transaction = new Transaction(table, families, nextId, |
| expireTimestamp); |
| if (keepAlive != -1) { |
| transaction.setKeepAlive(keepAlive); |
| } else { |
| transaction.setKeepAlive(writeTxnTimeout); |
| } |
| |
| refreshTransactionList(transaction.getTableName()); |
| String lockPath = prepareLockNode(table); |
| WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath, |
| Ids.OPEN_ACL_UNSAFE); |
| RMLockListener myLockListener = new RMLockListener(); |
| wLock.setLockListener(myLockListener); |
| try { |
| boolean lockGrabbed = wLock.lock(); |
| if (lockGrabbed == false) { |
| //TO DO : Let this request queue up and try obtaining lock. |
| throw new IOException( |
| "Unable to obtain lock while beginning transaction. " |
| + transaction.toString()); |
| } else { |
| List<String> colFamilies = transaction.getColumnFamilies(); |
| FamilyRevision revisionData = transaction.getFamilyRevisionInfo(); |
| for (String cfamily : colFamilies) { |
| String path = PathUtil.getRunningTxnInfoPath( |
| baseDir, table, cfamily); |
| zkUtil.updateData(path, revisionData, |
| ZKUtil.UpdateMode.APPEND); |
| } |
| } |
| } catch (KeeperException e) { |
| throw new IOException("Exception while obtaining lock.", e); |
| } catch (InterruptedException e) { |
| throw new IOException("Exception while obtaining lock.", e); |
| } |
| finally { |
| wLock.unlock(); |
| } |
| |
| return transaction; |
| } |
| |
| /* @param table The table name. |
| /* @param families The column families involved in the transaction. |
| /* @return transaction The transaction which was started. |
| /* @throws IOException |
| * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List) |
| */ |
| public Transaction beginWriteTransaction(String table, List<String> families) |
| throws IOException { |
| return beginWriteTransaction(table, families, -1); |
| } |
| |
| /** |
| * This method commits a write transaction. |
| * @param transaction The revision information associated with transaction. |
| * @throws java.io.IOException |
| */ |
| public void commitWriteTransaction(Transaction transaction) throws IOException { |
| refreshTransactionList(transaction.getTableName()); |
| |
| String lockPath = prepareLockNode(transaction.getTableName()); |
| WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath, |
| Ids.OPEN_ACL_UNSAFE); |
| RMLockListener myLockListener = new RMLockListener(); |
| wLock.setLockListener(myLockListener); |
| try { |
| boolean lockGrabbed = wLock.lock(); |
| if (lockGrabbed == false) { |
| //TO DO : Let this request queue up and try obtaining lock. |
| throw new IOException( |
| "Unable to obtain lock while commiting transaction. " |
| + transaction.toString()); |
| } else { |
| String tableName = transaction.getTableName(); |
| List<String> colFamilies = transaction.getColumnFamilies(); |
| FamilyRevision revisionData = transaction.getFamilyRevisionInfo(); |
| for (String cfamily : colFamilies) { |
| String path = PathUtil.getRunningTxnInfoPath( |
| baseDir, tableName, cfamily); |
| zkUtil.updateData(path, revisionData, |
| ZKUtil.UpdateMode.REMOVE); |
| } |
| |
| } |
| } catch (KeeperException e) { |
| throw new IOException("Exception while obtaining lock.", e); |
| } catch (InterruptedException e) { |
| throw new IOException("Exception while obtaining lock.", e); |
| } |
| finally { |
| wLock.unlock(); |
| } |
| LOG.info("Write Transaction committed: " + transaction.toString()); |
| } |
| |
| /** |
| * This method aborts a write transaction. |
| * @param transaction |
| * @throws java.io.IOException |
| */ |
| public void abortWriteTransaction(Transaction transaction) throws IOException { |
| |
| refreshTransactionList(transaction.getTableName()); |
| String lockPath = prepareLockNode(transaction.getTableName()); |
| WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath, |
| Ids.OPEN_ACL_UNSAFE); |
| RMLockListener myLockListener = new RMLockListener(); |
| wLock.setLockListener(myLockListener); |
| try { |
| boolean lockGrabbed = wLock.lock(); |
| if (lockGrabbed == false) { |
| //TO DO : Let this request queue up and try obtaining lock. |
| throw new IOException( |
| "Unable to obtain lock while aborting transaction. " |
| + transaction.toString()); |
| } else { |
| String tableName = transaction.getTableName(); |
| List<String> colFamilies = transaction.getColumnFamilies(); |
| FamilyRevision revisionData = transaction |
| .getFamilyRevisionInfo(); |
| for (String cfamily : colFamilies) { |
| String path = PathUtil.getRunningTxnInfoPath( |
| baseDir, tableName, cfamily); |
| zkUtil.updateData(path, revisionData, |
| ZKUtil.UpdateMode.REMOVE); |
| path = PathUtil.getAbortInformationPath(baseDir, |
| tableName, cfamily); |
| zkUtil.updateData(path, revisionData, |
| ZKUtil.UpdateMode.APPEND); |
| } |
| |
| } |
| } catch (KeeperException e) { |
| throw new IOException("Exception while obtaining lock.", e); |
| } catch (InterruptedException e) { |
| throw new IOException("Exception while obtaining lock.", e); |
| } |
| finally { |
| wLock.unlock(); |
| } |
| LOG.info("Write Transaction aborted: " + transaction.toString()); |
| } |
| |
| |
| /* @param transaction |
| /* @throws IOException |
| * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#keepAlive(org.apache.hcatalog.hbase.snapshot.Transaction) |
| */ |
| public void keepAlive(Transaction transaction) |
| throws IOException { |
| |
| refreshTransactionList(transaction.getTableName()); |
| transaction.keepAliveTransaction(); |
| String lockPath = prepareLockNode(transaction.getTableName()); |
| WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath, |
| Ids.OPEN_ACL_UNSAFE); |
| RMLockListener myLockListener = new RMLockListener(); |
| wLock.setLockListener(myLockListener); |
| try { |
| boolean lockGrabbed = wLock.lock(); |
| if (lockGrabbed == false) { |
| //TO DO : Let this request queue up and try obtaining lock. |
| throw new IOException( |
| "Unable to obtain lock for keep alive of transaction. " |
| + transaction.toString()); |
| }else { |
| String tableName = transaction.getTableName(); |
| List<String> colFamilies = transaction.getColumnFamilies(); |
| FamilyRevision revisionData = transaction.getFamilyRevisionInfo(); |
| for (String cfamily : colFamilies) { |
| String path = PathUtil.getRunningTxnInfoPath( |
| baseDir, tableName, cfamily); |
| zkUtil.updateData(path, revisionData, |
| ZKUtil.UpdateMode.KEEP_ALIVE); |
| } |
| |
| } |
| } catch (KeeperException e) { |
| throw new IOException("Exception while obtaining lock.", e); |
| } catch (InterruptedException e) { |
| throw new IOException("Exception while obtaining lock.", e); |
| }finally { |
| wLock.unlock(); |
| } |
| |
| } |
| |
| /* This method allows the user to create latest snapshot of a |
| /* table. |
| /* @param tableName The table whose snapshot is being created. |
| /* @return TableSnapshot An instance of TableSnaphot |
| /* @throws IOException |
| * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String) |
| */ |
| public TableSnapshot createSnapshot(String tableName) throws IOException{ |
| refreshTransactionList(tableName); |
| long latestID = zkUtil.currentID(tableName); |
| HashMap<String, Long> cfMap = new HashMap<String, Long>(); |
| List<String> columnFamilyNames = zkUtil.getColumnFamiliesOfTable(tableName); |
| |
| for(String cfName: columnFamilyNames){ |
| String cfPath = PathUtil.getRunningTxnInfoPath(baseDir, tableName, cfName); |
| List<FamilyRevision> tranxList = zkUtil.getTransactionList(cfPath); |
| long version; |
| if (!tranxList.isEmpty()) { |
| Collections.sort(tranxList); |
| // get the smallest running Transaction ID |
| long runningVersion = tranxList.get(0).getRevision(); |
| version = runningVersion -1; |
| } else { |
| version = latestID; |
| } |
| cfMap.put(cfName, version); |
| } |
| |
| TableSnapshot snapshot = new TableSnapshot(tableName, cfMap,latestID); |
| LOG.debug("Created snapshot For table: "+tableName+" snapshot: "+snapshot); |
| return snapshot; |
| } |
| |
| /* This method allows the user to create snapshot of a |
| /* table with a given revision number. |
| /* @param tableName |
| /* @param revision |
| /* @return TableSnapshot |
| /* @throws IOException |
| * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String, long) |
| */ |
| public TableSnapshot createSnapshot(String tableName, long revision) throws IOException{ |
| |
| long currentID = zkUtil.currentID(tableName); |
| if (revision > currentID) { |
| throw new IOException( |
| "The revision specified in the snapshot is higher than the current revision of the table."); |
| } |
| refreshTransactionList(tableName); |
| HashMap<String, Long> cfMap = new HashMap<String, Long>(); |
| List<String> columnFamilies = zkUtil.getColumnFamiliesOfTable(tableName); |
| |
| for(String cf: columnFamilies){ |
| cfMap.put(cf, revision); |
| } |
| |
| return new TableSnapshot(tableName, cfMap, revision); |
| } |
| |
| /** |
| * Get the list of in-progress Transactions for a column family |
| * @param table the table name |
| * @param columnFamily the column family name |
| * @return a list of in-progress WriteTransactions |
| * @throws java.io.IOException |
| */ |
| List<FamilyRevision> getRunningTransactions(String table, |
| String columnFamily) throws IOException { |
| String path = PathUtil.getRunningTxnInfoPath(baseDir, table, |
| columnFamily); |
| return zkUtil.getTransactionList(path); |
| } |
| |
| @Override |
| public List<FamilyRevision> getAbortedWriteTransactions(String table, |
| String columnFamily) throws IOException { |
| String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily); |
| return zkUtil.getTransactionList(path); |
| } |
| |
| private void refreshTransactionList(String tableName) throws IOException{ |
| String lockPath = prepareLockNode(tableName); |
| WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath, |
| Ids.OPEN_ACL_UNSAFE); |
| RMLockListener myLockListener = new RMLockListener(); |
| wLock.setLockListener(myLockListener); |
| try { |
| boolean lockGrabbed = wLock.lock(); |
| if (lockGrabbed == false) { |
| //TO DO : Let this request queue up and try obtaining lock. |
| throw new IOException( |
| "Unable to obtain lock while refreshing transactions of table " |
| + tableName + "."); |
| }else { |
| List<String> cfPaths = zkUtil |
| .getColumnFamiliesOfTable(tableName); |
| for (String cf : cfPaths) { |
| String runningDataPath = PathUtil.getRunningTxnInfoPath( |
| baseDir, tableName, cf); |
| zkUtil.refreshTransactions(runningDataPath); |
| } |
| |
| } |
| } catch (KeeperException e) { |
| throw new IOException("Exception while obtaining lock.", e); |
| } catch (InterruptedException e) { |
| throw new IOException("Exception while obtaining lock.", e); |
| } finally { |
| wLock.unlock(); |
| } |
| |
| } |
| |
| private String prepareLockNode(String tableName) throws IOException{ |
| String txnDataPath = PathUtil.getTxnDataPath(this.baseDir, tableName); |
| String lockPath = PathUtil.getLockManagementNode(txnDataPath); |
| zkUtil.ensurePathExists(lockPath, null, Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT); |
| return lockPath; |
| } |
| |
| /* |
| * This class is a listener class for the locks used in revision management. |
| * TBD: Use the following class to signal that that the lock is actually |
| * been granted. |
| */ |
| class RMLockListener implements LockListener { |
| |
| /* |
| * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired() |
| */ |
| @Override |
| public void lockAcquired() { |
| |
| } |
| |
| /* |
| * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased() |
| */ |
| @Override |
| public void lockReleased() { |
| |
| } |
| |
| } |
| |
| |
| } |