| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.hcatalog.hbase.snapshot; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hcatalog.hbase.SkeletonHBaseTest; |
| import org.apache.hcatalog.hbase.snapshot.transaction.thrift.StoreFamilyRevision; |
| import org.apache.hcatalog.hbase.snapshot.transaction.thrift.StoreFamilyRevisionList; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.data.Stat; |
| import org.junit.Test; |
| |
| public class TestRevisionManager extends SkeletonHBaseTest { |
| |
| @Test |
| public void testBasicZNodeCreation() throws IOException, KeeperException, InterruptedException { |
| |
| int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181); |
| String servers = getHbaseConf().get("hbase.zookeeper.quorum"); |
| String[] splits = servers.split(","); |
| StringBuffer sb = new StringBuffer(); |
| for (String split : splits) { |
| sb.append(split); |
| sb.append(':'); |
| sb.append(port); |
| } |
| |
| ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base"); |
| String tableName = newTableName("testTable"); |
| List<String> columnFamilies = Arrays.asList("cf001", "cf002", "cf003"); |
| |
| zkutil.createRootZNodes(); |
| ZooKeeper zk = zkutil.getSession(); |
| Stat tempTwo = zk.exists("/rm_base" + PathUtil.DATA_DIR, false); |
| assertTrue(tempTwo != null); |
| Stat tempThree = zk.exists("/rm_base" + PathUtil.CLOCK_NODE, false); |
| assertTrue(tempThree != null); |
| |
| zkutil.setUpZnodesForTable(tableName, columnFamilies); |
| String transactionDataTablePath = "/rm_base" + PathUtil.DATA_DIR + "/" + tableName; |
| Stat result = zk.exists(transactionDataTablePath, false); |
| assertTrue(result != null); |
| |
| for (String colFamiliy : columnFamilies) { |
| String cfPath = transactionDataTablePath + "/" + colFamiliy; |
| Stat resultTwo = zk.exists(cfPath, false); |
| assertTrue(resultTwo != null); |
| } |
| |
| } |
| |
| @Test |
| public void testCommitTransaction() throws IOException { |
| |
| int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181); |
| String servers = getHbaseConf().get("hbase.zookeeper.quorum"); |
| String[] splits = servers.split(","); |
| StringBuffer sb = new StringBuffer(); |
| for (String split : splits) { |
| sb.append(split); |
| sb.append(':'); |
| sb.append(port); |
| } |
| |
| Configuration conf = RevisionManagerConfiguration.create(getHbaseConf()); |
| conf.set(RMConstants.ZOOKEEPER_DATADIR, "/rm_base"); |
| ZKBasedRevisionManager manager = new ZKBasedRevisionManager(); |
| manager.initialize(conf); |
| manager.open(); |
| ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base"); |
| |
| String tableName = newTableName("testTable"); |
| List<String> columnFamilies = Arrays.asList("cf1", "cf2", "cf3"); |
| Transaction txn = manager.beginWriteTransaction(tableName, |
| columnFamilies); |
| |
| List<String> cfs = zkutil.getColumnFamiliesOfTable(tableName); |
| assertTrue(cfs.size() == columnFamilies.size()); |
| for (String cf : cfs) { |
| assertTrue(columnFamilies.contains(cf)); |
| } |
| |
| for (String colFamily : columnFamilies) { |
| String path = PathUtil.getRunningTxnInfoPath("/rm_base", tableName, colFamily); |
| byte[] data = zkutil.getRawData(path, null); |
| StoreFamilyRevisionList list = new StoreFamilyRevisionList(); |
| ZKUtil.deserialize(list, data); |
| assertEquals(list.getRevisionListSize(), 1); |
| StoreFamilyRevision lightTxn = list.getRevisionList().get(0); |
| assertEquals(lightTxn.timestamp, txn.getTransactionExpireTimeStamp()); |
| assertEquals(lightTxn.revision, txn.getRevisionNumber()); |
| |
| } |
| manager.commitWriteTransaction(txn); |
| for (String colFamiliy : columnFamilies) { |
| String path = PathUtil.getRunningTxnInfoPath("/rm_base", tableName, colFamiliy); |
| byte[] data = zkutil.getRawData(path, null); |
| StoreFamilyRevisionList list = new StoreFamilyRevisionList(); |
| ZKUtil.deserialize(list, data); |
| assertEquals(list.getRevisionListSize(), 0); |
| |
| } |
| |
| manager.close(); |
| } |
| |
| @Test |
| public void testAbortTransaction() throws IOException { |
| |
| int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181); |
| String host = getHbaseConf().get("hbase.zookeeper.quorum"); |
| Configuration conf = RevisionManagerConfiguration.create(getHbaseConf()); |
| conf.set(RMConstants.ZOOKEEPER_DATADIR, "/rm_base"); |
| ZKBasedRevisionManager manager = new ZKBasedRevisionManager(); |
| manager.initialize(conf); |
| manager.open(); |
| ZKUtil zkutil = new ZKUtil(host + ':' + port, "/rm_base"); |
| |
| String tableName = newTableName("testTable"); |
| List<String> columnFamilies = Arrays.asList("cf1", "cf2", "cf3"); |
| Transaction txn = manager.beginWriteTransaction(tableName, columnFamilies); |
| List<String> cfs = zkutil.getColumnFamiliesOfTable(tableName); |
| |
| assertTrue(cfs.size() == columnFamilies.size()); |
| for (String cf : cfs) { |
| assertTrue(columnFamilies.contains(cf)); |
| } |
| |
| for (String colFamiliy : columnFamilies) { |
| String path = PathUtil.getRunningTxnInfoPath("/rm_base", tableName, colFamiliy); |
| byte[] data = zkutil.getRawData(path, null); |
| StoreFamilyRevisionList list = new StoreFamilyRevisionList(); |
| ZKUtil.deserialize(list, data); |
| assertEquals(list.getRevisionListSize(), 1); |
| StoreFamilyRevision lightTxn = list.getRevisionList().get(0); |
| assertEquals(lightTxn.timestamp, txn.getTransactionExpireTimeStamp()); |
| assertEquals(lightTxn.revision, txn.getRevisionNumber()); |
| |
| } |
| manager.abortWriteTransaction(txn); |
| for (String colFamiliy : columnFamilies) { |
| String path = PathUtil.getRunningTxnInfoPath("/rm_base", tableName, colFamiliy); |
| byte[] data = zkutil.getRawData(path, null); |
| StoreFamilyRevisionList list = new StoreFamilyRevisionList(); |
| ZKUtil.deserialize(list, data); |
| assertEquals(list.getRevisionListSize(), 0); |
| |
| } |
| |
| for (String colFamiliy : columnFamilies) { |
| String path = PathUtil.getAbortInformationPath("/rm_base", tableName, colFamiliy); |
| byte[] data = zkutil.getRawData(path, null); |
| StoreFamilyRevisionList list = new StoreFamilyRevisionList(); |
| ZKUtil.deserialize(list, data); |
| assertEquals(list.getRevisionListSize(), 1); |
| StoreFamilyRevision abortedTxn = list.getRevisionList().get(0); |
| assertEquals(abortedTxn.getRevision(), txn.getRevisionNumber()); |
| } |
| manager.close(); |
| } |
| |
| @Test |
| public void testKeepAliveTransaction() throws InterruptedException, IOException { |
| |
| int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181); |
| String servers = getHbaseConf().get("hbase.zookeeper.quorum"); |
| String[] splits = servers.split(","); |
| StringBuffer sb = new StringBuffer(); |
| for (String split : splits) { |
| sb.append(split); |
| sb.append(':'); |
| sb.append(port); |
| } |
| |
| Configuration conf = RevisionManagerConfiguration.create(getHbaseConf()); |
| conf.set(RMConstants.ZOOKEEPER_DATADIR, "/rm_base"); |
| ZKBasedRevisionManager manager = new ZKBasedRevisionManager(); |
| manager.initialize(conf); |
| manager.open(); |
| String tableName = newTableName("testTable"); |
| List<String> columnFamilies = Arrays.asList("cf1", "cf2"); |
| Transaction txn = manager.beginWriteTransaction(tableName, |
| columnFamilies, 40); |
| Thread.sleep(100); |
| try { |
| manager.commitWriteTransaction(txn); |
| } catch (Exception e) { |
| assertTrue(e instanceof IOException); |
| assertEquals(e.getMessage(), |
| "The transaction to be removed not found in the data."); |
| } |
| |
| } |
| |
| @Test |
| public void testCreateSnapshot() throws IOException { |
| int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181); |
| String host = getHbaseConf().get("hbase.zookeeper.quorum"); |
| Configuration conf = RevisionManagerConfiguration.create(getHbaseConf()); |
| conf.set(RMConstants.ZOOKEEPER_DATADIR, "/rm_base"); |
| ZKBasedRevisionManager manager = new ZKBasedRevisionManager(); |
| manager.initialize(conf); |
| manager.open(); |
| String tableName = newTableName("testTable"); |
| List<String> cfOne = Arrays.asList("cf1", "cf2"); |
| List<String> cfTwo = Arrays.asList("cf2", "cf3"); |
| Transaction tsx1 = manager.beginWriteTransaction(tableName, cfOne); |
| Transaction tsx2 = manager.beginWriteTransaction(tableName, cfTwo); |
| TableSnapshot snapshotOne = manager.createSnapshot(tableName); |
| assertEquals(snapshotOne.getRevision("cf1"), 0); |
| assertEquals(snapshotOne.getRevision("cf2"), 0); |
| assertEquals(snapshotOne.getRevision("cf3"), 1); |
| |
| List<String> cfThree = Arrays.asList("cf1", "cf3"); |
| Transaction tsx3 = manager.beginWriteTransaction(tableName, cfThree); |
| manager.commitWriteTransaction(tsx1); |
| TableSnapshot snapshotTwo = manager.createSnapshot(tableName); |
| assertEquals(snapshotTwo.getRevision("cf1"), 2); |
| assertEquals(snapshotTwo.getRevision("cf2"), 1); |
| assertEquals(snapshotTwo.getRevision("cf3"), 1); |
| |
| manager.commitWriteTransaction(tsx2); |
| TableSnapshot snapshotThree = manager.createSnapshot(tableName); |
| assertEquals(snapshotThree.getRevision("cf1"), 2); |
| assertEquals(snapshotThree.getRevision("cf2"), 3); |
| assertEquals(snapshotThree.getRevision("cf3"), 2); |
| manager.commitWriteTransaction(tsx3); |
| TableSnapshot snapshotFour = manager.createSnapshot(tableName); |
| assertEquals(snapshotFour.getRevision("cf1"), 3); |
| assertEquals(snapshotFour.getRevision("cf2"), 3); |
| assertEquals(snapshotFour.getRevision("cf3"), 3); |
| |
| } |
| |
| |
| } |