| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.tephra.hbase; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellUtil; |
| import org.apache.hadoop.hbase.DoNotRetryIOException; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.Delete; |
| import org.apache.hadoop.hbase.client.Durability; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.OperationWithAttributes; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.ResultScanner; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; |
| import org.apache.hadoop.hbase.coprocessor.ObserverContext; |
| import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; |
| import org.apache.hadoop.hbase.filter.CompareFilter; |
| import org.apache.hadoop.hbase.filter.LongComparator; |
| import org.apache.hadoop.hbase.filter.ValueFilter; |
| import org.apache.hadoop.hbase.regionserver.wal.WALEdit; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.tephra.Transaction; |
| import org.apache.tephra.TransactionConflictException; |
| import org.apache.tephra.TransactionContext; |
| import org.apache.tephra.TransactionManager; |
| import org.apache.tephra.TransactionSystemClient; |
| import org.apache.tephra.TxConstants; |
| import org.apache.tephra.TxConstants.ConflictDetection; |
| import org.apache.tephra.hbase.coprocessor.TransactionProcessor; |
| import org.apache.tephra.inmemory.InMemoryTxSystemClient; |
| import org.apache.tephra.metrics.TxMetricsCollector; |
| import org.apache.tephra.persist.HDFSTransactionStateStorage; |
| import org.apache.tephra.persist.InMemoryTransactionStateStorage; |
| import org.apache.tephra.persist.TransactionStateStorage; |
| import org.apache.tephra.snapshot.SnapshotCodecProvider; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.ClassRule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Tests for TransactionAwareHTables. |
| */ |
| public class TransactionAwareHTableTest extends AbstractHBaseTableTest { |
| private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class); |
| |
| static TransactionStateStorage txStateStorage; |
| static TransactionManager txManager; |
| private TransactionContext transactionContext; |
| private TransactionAwareHTable transactionAwareHTable; |
| private Table hTable; |
| static Connection conn; |
| |
| @ClassRule |
| public static TemporaryFolder tmpFolder = new TemporaryFolder(); |
| |
| private static MiniDFSCluster dfsCluster; |
| |
| public static void tearDownAfterClass() throws Exception { |
| dfsCluster.shutdown(); |
| } |
| |
| private static final class TestBytes { |
| private static final byte[] table = Bytes.toBytes("testtable"); |
| private static final byte[] family = Bytes.toBytes("f1"); |
| private static final byte[] family2 = Bytes.toBytes("f2"); |
| private static final byte[] qualifier = Bytes.toBytes("col1"); |
| private static final byte[] qualifier2 = Bytes.toBytes("col2"); |
| private static final byte[] row = Bytes.toBytes("row"); |
| private static final byte[] row2 = Bytes.toBytes("row2"); |
| private static final byte[] row3 = Bytes.toBytes("row3"); |
| private static final byte[] row4 = Bytes.toBytes("row4"); |
| private static final byte[] value = Bytes.toBytes("value"); |
| private static final byte[] value2 = Bytes.toBytes("value2"); |
| private static final byte[] value3 = Bytes.toBytes("value3"); |
| } |
| |
| private static final String TEST_ATTRIBUTE = "TEST_ATTRIBUTE"; |
| |
| public static class TestRegionObserver extends BaseRegionObserver { |
| @Override |
| public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, |
| final Put put, final WALEdit edit, |
| final Durability durability) throws IOException { |
| if (put.getAttribute(TEST_ATTRIBUTE) == null) { |
| throw new DoNotRetryIOException("Put should preserve attributes"); |
| } |
| if (put.getDurability() != Durability.USE_DEFAULT) { |
| throw new DoNotRetryIOException("Durability is not propagated correctly"); |
| } |
| } |
| |
| @Override |
| public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c, |
| final Delete delete, final WALEdit edit, |
| final Durability durability) throws IOException { |
| if (delete.getAttribute(TEST_ATTRIBUTE) == null) { |
| throw new DoNotRetryIOException("Delete should preserve attributes"); |
| } |
| if (delete.getDurability() != Durability.USE_DEFAULT) { |
| throw new DoNotRetryIOException("Durability is not propagated correctly"); |
| } |
| } |
| } |
| |
| @BeforeClass |
| public static void setupBeforeClass() throws Exception { |
| testUtil = new HBaseTestingUtility(); |
| conf = testUtil.getConfiguration(); |
| |
| conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath()); |
| dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); |
| |
| conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); |
| conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); |
| |
| conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5); |
| |
| // Tune down the connection thread pool size |
| conf.setInt("hbase.hconnection.threads.core", 5); |
| conf.setInt("hbase.hconnection.threads.max", 10); |
| // Tunn down handler threads in regionserver |
| conf.setInt("hbase.regionserver.handler.count", 10); |
| |
| // Set to random port |
| conf.setInt("hbase.master.port", 0); |
| conf.setInt("hbase.master.info.port", 0); |
| conf.setInt("hbase.regionserver.port", 0); |
| conf.setInt("hbase.regionserver.info.port", 0); |
| |
| testUtil.startMiniCluster(); |
| hBaseAdmin = testUtil.getHBaseAdmin(); |
| conn = testUtil.getConnection(); |
| txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); |
| txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); |
| txManager.startAndWait(); |
| } |
| |
| @AfterClass |
| public static void shutdownAfterClass() throws Exception { |
| if (txManager != null) { |
| txManager.stopAndWait(); |
| } |
| if (conn != null) { |
| conn.close(); |
| } |
| } |
| |
| @Before |
| public void setupBeforeTest() throws Exception { |
| hTable = createTable(TestBytes.table, new byte[][]{TestBytes.family}); |
| transactionAwareHTable = new TransactionAwareHTable(hTable); |
| transactionContext = new TransactionContext(new InMemoryTxSystemClient(txManager), transactionAwareHTable); |
| } |
| |
| @After |
| public void shutdownAfterTest() throws IOException { |
| hBaseAdmin.disableTable(TestBytes.table); |
| hBaseAdmin.deleteTable(TestBytes.table); |
| } |
| |
| /** |
| * Test transactional put and get requests. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testValidTransactionalPutAndGet() throws Exception { |
| transactionContext.start(); |
| Put put = new Put(TestBytes.row); |
| put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); |
| transactionAwareHTable.put(put); |
| transactionContext.finish(); |
| |
| transactionContext.start(); |
| Result result = transactionAwareHTable.get(new Get(TestBytes.row)); |
| transactionContext.finish(); |
| |
| byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier); |
| assertArrayEquals(TestBytes.value, value); |
| } |
| |
| /** |
| * Test aborted put requests, that must be rolled back. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testAbortedTransactionPutAndGet() throws Exception { |
| transactionContext.start(); |
| Put put = new Put(TestBytes.row); |
| put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); |
| transactionAwareHTable.put(put); |
| |
| transactionContext.abort(); |
| |
| transactionContext.start(); |
| Result result = transactionAwareHTable.get(new Get(TestBytes.row)); |
| transactionContext.finish(); |
| byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier); |
| assertArrayEquals(value, null); |
| } |
| |
| /** |
| * Test transactional delete operations. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testValidTransactionalDelete() throws Exception { |
| try (Table hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"), |
| new byte[][]{TestBytes.family, TestBytes.family2})) { |
| TransactionAwareHTable txTable = new TransactionAwareHTable(hTable); |
| TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); |
| |
| txContext.start(); |
| Put put = new Put(TestBytes.row); |
| put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); |
| put.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value2); |
| txTable.put(put); |
| txContext.finish(); |
| |
| txContext.start(); |
| Result result = txTable.get(new Get(TestBytes.row)); |
| txContext.finish(); |
| byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier); |
| assertArrayEquals(TestBytes.value, value); |
| value = result.getValue(TestBytes.family2, TestBytes.qualifier); |
| assertArrayEquals(TestBytes.value2, value); |
| |
| // test full row delete |
| txContext.start(); |
| Delete delete = new Delete(TestBytes.row); |
| txTable.delete(delete); |
| txContext.finish(); |
| |
| txContext.start(); |
| result = txTable.get(new Get(TestBytes.row)); |
| txContext.finish(); |
| assertTrue(result.isEmpty()); |
| |
| // test column delete |
| // load 10 rows |
| txContext.start(); |
| int rowCount = 10; |
| for (int i = 0; i < rowCount; i++) { |
| Put p = new Put(Bytes.toBytes("row" + i)); |
| for (int j = 0; j < 10; j++) { |
| p.add(TestBytes.family, Bytes.toBytes(j), TestBytes.value); |
| } |
| txTable.put(p); |
| } |
| txContext.finish(); |
| |
| // verify loaded rows |
| txContext.start(); |
| for (int i = 0; i < rowCount; i++) { |
| Get g = new Get(Bytes.toBytes("row" + i)); |
| Result r = txTable.get(g); |
| assertFalse(r.isEmpty()); |
| for (int j = 0; j < 10; j++) { |
| assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, Bytes.toBytes(j))); |
| } |
| } |
| txContext.finish(); |
| |
| // delete odds columns from odd rows and even columns from even rows |
| txContext.start(); |
| for (int i = 0; i < rowCount; i++) { |
| Delete d = new Delete(Bytes.toBytes("row" + i)); |
| for (int j = 0; j < 10; j++) { |
| if (i % 2 == j % 2) { |
| d.deleteColumns(TestBytes.family, Bytes.toBytes(j)); |
| } |
| } |
| txTable.delete(d); |
| } |
| txContext.finish(); |
| |
| // verify deleted columns |
| txContext.start(); |
| for (int i = 0; i < rowCount; i++) { |
| Get g = new Get(Bytes.toBytes("row" + i)); |
| Result r = txTable.get(g); |
| assertEquals(5, r.size()); |
| for (Map.Entry<byte[], byte[]> entry : r.getFamilyMap(TestBytes.family).entrySet()) { |
| int col = Bytes.toInt(entry.getKey()); |
| // each row should only have the opposite mod (odd=even, even=odd) |
| assertNotEquals(i % 2, col % 2); |
| assertArrayEquals(TestBytes.value, entry.getValue()); |
| } |
| } |
| txContext.finish(); |
| |
| // test family delete |
| // load 10 rows |
| txContext.start(); |
| for (int i = 0; i < rowCount; i++) { |
| Put p = new Put(Bytes.toBytes("famrow" + i)); |
| p.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); |
| p.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2); |
| txTable.put(p); |
| } |
| txContext.finish(); |
| |
| // verify all loaded rows |
| txContext.start(); |
| for (int i = 0; i < rowCount; i++) { |
| Get g = new Get(Bytes.toBytes("famrow" + i)); |
| Result r = txTable.get(g); |
| assertEquals(2, r.size()); |
| assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2)); |
| } |
| txContext.finish(); |
| |
| // delete family1 for even rows, family2 for odd rows |
| txContext.start(); |
| for (int i = 0; i < rowCount; i++) { |
| Delete d = new Delete(Bytes.toBytes("famrow" + i)); |
| d.deleteFamily((i % 2 == 0) ? TestBytes.family : TestBytes.family2); |
| txTable.delete(d); |
| } |
| txContext.finish(); |
| |
| // verify deleted families |
| txContext.start(); |
| for (int i = 0; i < rowCount; i++) { |
| Get g = new Get(Bytes.toBytes("famrow" + i)); |
| Result r = txTable.get(g); |
| assertEquals(1, r.size()); |
| if (i % 2 == 0) { |
| assertNull(r.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2)); |
| } else { |
| assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertNull(r.getValue(TestBytes.family2, TestBytes.qualifier2)); |
| } |
| } |
| txContext.finish(); |
| } |
| } |
| |
| /** |
| * Test that put and delete attributes are preserved |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testAttributesPreserved() throws Exception { |
| Table hTable = createTable(Bytes.toBytes("TestAttributesPreserved"), |
| new byte[][]{TestBytes.family, TestBytes.family2}, false, |
| Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName())); |
| try { |
| TransactionAwareHTable txTable = new TransactionAwareHTable(hTable); |
| TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); |
| |
| txContext.start(); |
| Put put = new Put(TestBytes.row); |
| put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); |
| put.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value2); |
| // set an attribute on the put, TestRegionObserver will verify it still exists |
| put.setAttribute(TEST_ATTRIBUTE, new byte[]{}); |
| txTable.put(put); |
| txContext.finish(); |
| |
| txContext.start(); |
| Result result = txTable.get(new Get(TestBytes.row)); |
| txContext.finish(); |
| byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier); |
| assertArrayEquals(TestBytes.value, value); |
| value = result.getValue(TestBytes.family2, TestBytes.qualifier); |
| assertArrayEquals(TestBytes.value2, value); |
| |
| // test full row delete, TestRegionObserver will verify it still exists |
| txContext.start(); |
| Delete delete = new Delete(TestBytes.row); |
| delete.setAttribute(TEST_ATTRIBUTE, new byte[]{}); |
| txTable.delete(delete); |
| txContext.finish(); |
| |
| txContext.start(); |
| result = txTable.get(new Get(TestBytes.row)); |
| txContext.finish(); |
| assertTrue(result.isEmpty()); |
| } finally { |
| hTable.close(); |
| } |
| } |
| |
| @Test |
| public void testFamilyDeleteWithCompaction() throws Exception { |
| Table hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"), |
| new byte[][]{TestBytes.family, TestBytes.family2}); |
| try { |
| TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW); |
| TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); |
| |
| txContext.start(); |
| Put put = new Put(TestBytes.row); |
| put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); |
| txTable.put(put); |
| |
| put = new Put(TestBytes.row2); |
| put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); |
| txTable.put(put); |
| txContext.finish(); |
| |
| txContext.start(); |
| Result result = txTable.get(new Get(TestBytes.row)); |
| txContext.finish(); |
| assertFalse(result.isEmpty()); |
| |
| txContext.start(); |
| // test family delete with ConflictDetection.ROW (as ConflictDetection.COLUMN converts this to a column delete) |
| Delete delete = new Delete(TestBytes.row); |
| delete.deleteFamily(TestBytes.family); |
| txTable.delete(delete); |
| txContext.finish(); |
| |
| txContext.start(); |
| result = txTable.get(new Get(TestBytes.row)); |
| txContext.finish(); |
| assertTrue(result.isEmpty()); |
| |
| boolean compactionDone = false; |
| int count = 0; |
| while (count++ < 12 && !compactionDone) { |
| // run major compaction and verify the row was removed |
| Admin hbaseAdmin = testUtil.getHBaseAdmin(); |
| hbaseAdmin.flush(TableName.valueOf("TestFamilyDeleteWithCompaction")); |
| hbaseAdmin.majorCompact(TableName.valueOf("TestFamilyDeleteWithCompaction")); |
| hbaseAdmin.close(); |
| Thread.sleep(5000L); |
| |
| Scan scan = new Scan(); |
| scan.setStartRow(TestBytes.row); |
| scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 })); |
| scan.setRaw(true); |
| |
| ResultScanner scanner = hTable.getScanner(scan); |
| compactionDone = scanner.next() == null; |
| scanner.close(); |
| } |
| assertTrue("Compaction should have removed the row", compactionDone); |
| |
| Scan scan = new Scan(); |
| scan.setStartRow(TestBytes.row2); |
| scan.setStopRow(Bytes.add(TestBytes.row2, new byte[] { 0 })); |
| scan.setRaw(true); |
| |
| ResultScanner scanner = hTable.getScanner(scan); |
| Result res = scanner.next(); |
| assertNotNull(res); |
| } finally { |
| hTable.close(); |
| } |
| } |
| |
| /** |
| * Test aborted transactional delete requests, that must be rolled back. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testAbortedTransactionalDelete() throws Exception { |
| transactionContext.start(); |
| Put put = new Put(TestBytes.row); |
| put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); |
| transactionAwareHTable.put(put); |
| transactionContext.finish(); |
| |
| transactionContext.start(); |
| Result result = transactionAwareHTable.get(new Get(TestBytes.row)); |
| transactionContext.finish(); |
| byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier); |
| assertArrayEquals(TestBytes.value, value); |
| |
| transactionContext.start(); |
| Delete delete = new Delete(TestBytes.row); |
| transactionAwareHTable.delete(delete); |
| transactionContext.abort(); |
| |
| transactionContext.start(); |
| result = transactionAwareHTable.get(new Get(TestBytes.row)); |
| transactionContext.finish(); |
| value = result.getValue(TestBytes.family, TestBytes.qualifier); |
| assertArrayEquals(TestBytes.value, value); |
| } |
| |
| private void testDeleteRollback(TxConstants.ConflictDetection conflictDetection) throws Exception { |
| String tableName = String.format("%s%s", "TestColFamilyDelete", conflictDetection); |
| Table hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family}); |
| try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, conflictDetection)) { |
| TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); |
| txContext.start(); |
| txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| txContext.finish(); |
| |
| // Start a tx, delete the row and then abort the tx |
| txContext.start(); |
| txTable.delete(new Delete(TestBytes.row)); |
| txContext.abort(); |
| |
| // Start a tx, delete a column family and then abort the tx |
| txContext.start(); |
| txTable.delete(new Delete(TestBytes.row).deleteFamily(TestBytes.family)); |
| txContext.abort(); |
| |
| // Above operations should have no effect on the row, since they were aborted |
| txContext.start(); |
| Get get = new Get(TestBytes.row); |
| Result result = txTable.get(get); |
| assertFalse(result.isEmpty()); |
| assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| txContext.finish(); |
| } |
| } |
| |
| @Test |
| public void testDeleteRollback() throws Exception { |
| testDeleteRollback(TxConstants.ConflictDetection.ROW); |
| testDeleteRollback(TxConstants.ConflictDetection.COLUMN); |
| testDeleteRollback(TxConstants.ConflictDetection.NONE); |
| } |
| |
| @Test |
| public void testMultiColumnFamilyRowDeleteRollback() throws Exception { |
| Table hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2}); |
| try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) { |
| TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); |
| txContext.start(); |
| txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| txContext.finish(); |
| |
| txContext.start(); |
| //noinspection ConstantConditions |
| txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL); |
| Result result = txTable.get(new Get(TestBytes.row)); |
| Assert.assertEquals(1, result.getFamilyMap(TestBytes.family).size()); |
| Assert.assertEquals(0, result.getFamilyMap(TestBytes.family2).size()); |
| txContext.finish(); |
| |
| //Start a tx, delete the row and then abort the tx |
| txContext.start(); |
| txTable.delete(new Delete(TestBytes.row)); |
| txContext.abort(); |
| |
| //Start a tx and scan all the col families to make sure none of them have delete markers |
| txContext.start(); |
| txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL); |
| result = txTable.get(new Get(TestBytes.row)); |
| Assert.assertEquals(1, result.getFamilyMap(TestBytes.family).size()); |
| Assert.assertEquals(0, result.getFamilyMap(TestBytes.family2).size()); |
| txContext.finish(); |
| } |
| } |
| |
| @Test |
| public void testRowDelete() throws Exception { |
| Table hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2}); |
| try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) { |
| TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); |
| |
| // Test 1: full row delete |
| txContext.start(); |
| txTable.put(new Put(TestBytes.row) |
| .add(TestBytes.family, TestBytes.qualifier, TestBytes.value) |
| .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2) |
| .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value) |
| .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2)); |
| txContext.finish(); |
| |
| txContext.start(); |
| Get get = new Get(TestBytes.row); |
| Result result = txTable.get(get); |
| assertFalse(result.isEmpty()); |
| assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier2)); |
| assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier)); |
| assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family2, TestBytes.qualifier2)); |
| txContext.finish(); |
| |
| // delete entire row |
| txContext.start(); |
| txTable.delete(new Delete(TestBytes.row)); |
| txContext.finish(); |
| |
| // verify row is now empty |
| txContext.start(); |
| result = txTable.get(new Get(TestBytes.row)); |
| assertTrue(result.isEmpty()); |
| |
| // verify row is empty for explicit column retrieval |
| result = txTable.get(new Get(TestBytes.row) |
| .addColumn(TestBytes.family, TestBytes.qualifier) |
| .addFamily(TestBytes.family2)); |
| assertTrue(result.isEmpty()); |
| |
| // verify row is empty for scan |
| ResultScanner scanner = txTable.getScanner(new Scan(TestBytes.row)); |
| assertNull(scanner.next()); |
| scanner.close(); |
| |
| // verify row is empty for scan with explicit column |
| scanner = txTable.getScanner(new Scan(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2)); |
| assertNull(scanner.next()); |
| scanner.close(); |
| txContext.finish(); |
| |
| // write swapped values to one column per family |
| txContext.start(); |
| txTable.put(new Put(TestBytes.row) |
| .add(TestBytes.family, TestBytes.qualifier, TestBytes.value2) |
| .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value)); |
| txContext.finish(); |
| |
| // verify new values appear |
| txContext.start(); |
| result = txTable.get(new Get(TestBytes.row)); |
| assertFalse(result.isEmpty()); |
| assertEquals(2, result.size()); |
| assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2)); |
| |
| scanner = txTable.getScanner(new Scan(TestBytes.row)); |
| Result result1 = scanner.next(); |
| assertNotNull(result1); |
| assertFalse(result1.isEmpty()); |
| assertEquals(2, result1.size()); |
| assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2)); |
| scanner.close(); |
| txContext.finish(); |
| |
| // Test 2: delete of first column family |
| txContext.start(); |
| txTable.put(new Put(TestBytes.row2) |
| .add(TestBytes.family, TestBytes.qualifier, TestBytes.value) |
| .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2) |
| .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value) |
| .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2)); |
| txContext.finish(); |
| |
| txContext.start(); |
| txTable.delete(new Delete(TestBytes.row2).deleteFamily(TestBytes.family)); |
| txContext.finish(); |
| |
| txContext.start(); |
| Result fam1Result = txTable.get(new Get(TestBytes.row2)); |
| assertFalse(fam1Result.isEmpty()); |
| assertEquals(2, fam1Result.size()); |
| assertArrayEquals(TestBytes.value, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier)); |
| assertArrayEquals(TestBytes.value2, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier2)); |
| txContext.finish(); |
| |
| // Test 3: delete of second column family |
| txContext.start(); |
| txTable.put(new Put(TestBytes.row3) |
| .add(TestBytes.family, TestBytes.qualifier, TestBytes.value) |
| .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2) |
| .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value) |
| .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2)); |
| txContext.finish(); |
| |
| txContext.start(); |
| txTable.delete(new Delete(TestBytes.row3).deleteFamily(TestBytes.family2)); |
| txContext.finish(); |
| |
| txContext.start(); |
| Result fam2Result = txTable.get(new Get(TestBytes.row3)); |
| assertFalse(fam2Result.isEmpty()); |
| assertEquals(2, fam2Result.size()); |
| assertArrayEquals(TestBytes.value, fam2Result.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertArrayEquals(TestBytes.value2, fam2Result.getValue(TestBytes.family, TestBytes.qualifier2)); |
| txContext.finish(); |
| |
| // Test 4: delete specific rows in a range |
| txContext.start(); |
| for (int i = 0; i < 10; i++) { |
| txTable.put(new Put(Bytes.toBytes("z" + i)) |
| .add(TestBytes.family, TestBytes.qualifier, Bytes.toBytes(i)) |
| .add(TestBytes.family2, TestBytes.qualifier2, Bytes.toBytes(i))); |
| } |
| txContext.finish(); |
| |
| txContext.start(); |
| // delete odd rows |
| for (int i = 1; i < 10; i += 2) { |
| txTable.delete(new Delete(Bytes.toBytes("z" + i))); |
| } |
| txContext.finish(); |
| |
| txContext.start(); |
| int cnt = 0; |
| ResultScanner zScanner = txTable.getScanner(new Scan(Bytes.toBytes("z0"))); |
| Result res; |
| while ((res = zScanner.next()) != null) { |
| assertFalse(res.isEmpty()); |
| assertArrayEquals(Bytes.toBytes("z" + cnt), res.getRow()); |
| assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family2, TestBytes.qualifier2)); |
| cnt += 2; |
| } |
| |
| // Test 5: delete prior writes in the same transaction |
| txContext.start(); |
| txTable.put(new Put(TestBytes.row4) |
| .add(TestBytes.family, TestBytes.qualifier, TestBytes.value) |
| .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2)); |
| txTable.delete(new Delete(TestBytes.row4)); |
| txContext.finish(); |
| |
| txContext.start(); |
| Result row4Result = txTable.get(new Get(TestBytes.row4)); |
| assertTrue(row4Result.isEmpty()); |
| txContext.finish(); |
| } |
| } |
| |
| /** |
| * Expect an exception since a transaction hasn't been started. |
| * |
| * @throws Exception |
| */ |
| @Test(expected = IOException.class) |
| public void testTransactionlessFailure() throws Exception { |
| transactionAwareHTable.get(new Get(TestBytes.row)); |
| } |
| |
| /** |
| * Tests that each transaction can see its own persisted writes, while not seeing writes from other |
| * in-progress transactions. |
| */ |
| @Test |
| public void testReadYourWrites() throws Exception { |
| // In-progress tx1: started before our main transaction |
| Table hTable1 = conn.getTable(TableName.valueOf(TestBytes.table)); |
| TransactionAwareHTable txHTable1 = new TransactionAwareHTable(hTable1); |
| TransactionContext inprogressTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable1); |
| |
| // In-progress tx2: started while our main transaction is running |
| Table hTable2 = conn.getTable(TableName.valueOf(TestBytes.table)); |
| TransactionAwareHTable txHTable2 = new TransactionAwareHTable(hTable2); |
| TransactionContext inprogressTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable2); |
| |
| // create an in-progress write that should be ignored |
| byte[] col2 = Bytes.toBytes("col2"); |
| inprogressTxContext1.start(); |
| Put putCol2 = new Put(TestBytes.row); |
| byte[] valueCol2 = Bytes.toBytes("writing in progress"); |
| putCol2.add(TestBytes.family, col2, valueCol2); |
| txHTable1.put(putCol2); |
| |
| // start a tx and write a value to test reading in same tx |
| transactionContext.start(); |
| Put put = new Put(TestBytes.row); |
| byte[] value = Bytes.toBytes("writing"); |
| put.add(TestBytes.family, TestBytes.qualifier, value); |
| transactionAwareHTable.put(put); |
| |
| // test that a write from a tx started after the first is not visible |
| inprogressTxContext2.start(); |
| Put put2 = new Put(TestBytes.row); |
| byte[] value2 = Bytes.toBytes("writing2"); |
| put2.add(TestBytes.family, TestBytes.qualifier, value2); |
| txHTable2.put(put2); |
| |
| Get get = new Get(TestBytes.row); |
| Result row = transactionAwareHTable.get(get); |
| assertFalse(row.isEmpty()); |
| byte[] col1Value = row.getValue(TestBytes.family, TestBytes.qualifier); |
| Assert.assertNotNull(col1Value); |
| Assert.assertArrayEquals(value, col1Value); |
| // write from in-progress transaction should not be visible |
| byte[] col2Value = row.getValue(TestBytes.family, col2); |
| assertNull(col2Value); |
| |
| // commit in-progress transaction, should still not be visible |
| inprogressTxContext1.finish(); |
| |
| get = new Get(TestBytes.row); |
| row = transactionAwareHTable.get(get); |
| assertFalse(row.isEmpty()); |
| col2Value = row.getValue(TestBytes.family, col2); |
| assertNull(col2Value); |
| |
| transactionContext.finish(); |
| |
| inprogressTxContext2.abort(); |
| } |
| |
| @Test |
| public void testRowLevelConflictDetection() throws Exception { |
| TransactionAwareHTable txTable1 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)), |
| TxConstants.ConflictDetection.ROW); |
| TransactionContext txContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable1); |
| |
| TransactionAwareHTable txTable2 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)), |
| TxConstants.ConflictDetection.ROW); |
| TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2); |
| |
| byte[] row1 = Bytes.toBytes("row1"); |
| byte[] row2 = Bytes.toBytes("row2"); |
| byte[] col1 = Bytes.toBytes("c1"); |
| byte[] col2 = Bytes.toBytes("c2"); |
| byte[] val1 = Bytes.toBytes("val1"); |
| byte[] val2 = Bytes.toBytes("val2"); |
| |
| // test that concurrent writing to different rows succeeds |
| txContext1.start(); |
| txTable1.put(new Put(row1).add(TestBytes.family, col1, val1)); |
| |
| txContext2.start(); |
| txTable2.put(new Put(row2).add(TestBytes.family, col1, val2)); |
| |
| // should be no conflicts |
| txContext1.finish(); |
| txContext2.finish(); |
| |
| transactionContext.start(); |
| Result res = transactionAwareHTable.get(new Get(row1)); |
| assertFalse(res.isEmpty()); |
| Cell cell = res.getColumnLatestCell(TestBytes.family, col1); |
| assertNotNull(cell); |
| assertArrayEquals(val1, CellUtil.cloneValue(cell)); |
| |
| res = transactionAwareHTable.get(new Get(row2)); |
| assertFalse(res.isEmpty()); |
| cell = res.getColumnLatestCell(TestBytes.family, col1); |
| assertNotNull(cell); |
| assertArrayEquals(val2, CellUtil.cloneValue(cell)); |
| transactionContext.finish(); |
| |
| // test that writing to different columns in the same row fails |
| txContext1.start(); |
| txTable1.put(new Put(row1).add(TestBytes.family, col1, val2)); |
| |
| txContext2.start(); |
| txTable2.put(new Put(row1).add(TestBytes.family, col2, val2)); |
| |
| txContext1.finish(); |
| try { |
| txContext2.finish(); |
| fail("txContext2 should have encountered a row-level conflict during commit"); |
| } catch (TransactionConflictException tce) { |
| txContext2.abort(); |
| } |
| |
| transactionContext.start(); |
| res = transactionAwareHTable.get(new Get(row1)); |
| assertFalse(res.isEmpty()); |
| cell = res.getColumnLatestCell(TestBytes.family, col1); |
| assertNotNull(cell); |
| // should now be val2 |
| assertArrayEquals(val2, CellUtil.cloneValue(cell)); |
| |
| cell = res.getColumnLatestCell(TestBytes.family, col2); |
| // col2 should not be visible due to conflict |
| assertNull(cell); |
| transactionContext.finish(); |
| |
| // test that writing to the same column in the same row fails |
| txContext1.start(); |
| txTable1.put(new Put(row2).add(TestBytes.family, col2, val1)); |
| |
| txContext2.start(); |
| txTable2.put(new Put(row2).add(TestBytes.family, col2, val2)); |
| |
| txContext1.finish(); |
| try { |
| txContext2.finish(); |
| fail("txContext2 should have encountered a row and column level conflict during commit"); |
| } catch (TransactionConflictException tce) { |
| txContext2.abort(); |
| } |
| |
| transactionContext.start(); |
| res = transactionAwareHTable.get(new Get(row2)); |
| assertFalse(res.isEmpty()); |
| cell = res.getColumnLatestCell(TestBytes.family, col2); |
| assertNotNull(cell); |
| // should now be val1 |
| assertArrayEquals(val1, CellUtil.cloneValue(cell)); |
| transactionContext.finish(); |
| |
| // verify change set that is being reported only on rows |
| txContext1.start(); |
| txTable1.put(new Put(row1).add(TestBytes.family, col1, val1)); |
| txTable1.put(new Put(row2).add(TestBytes.family, col2, val2)); |
| |
| Collection<byte[]> changeSet = txTable1.getTxChanges(); |
| assertNotNull(changeSet); |
| assertEquals(2, changeSet.size()); |
| assertTrue(changeSet.contains(txTable1.getChangeKey(row1, null, null))); |
| assertTrue(changeSet.contains(txTable1.getChangeKey(row2, null, null))); |
| txContext1.finish(); |
| } |
| |
| @Test |
| public void testNoneLevelConflictDetection() throws Exception { |
| InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager); |
| TransactionAwareHTable txTable1 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)), |
| TxConstants.ConflictDetection.NONE); |
| TransactionContext txContext1 = new TransactionContext(txClient, txTable1); |
| |
| TransactionAwareHTable txTable2 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)), |
| TxConstants.ConflictDetection.NONE); |
| TransactionContext txContext2 = new TransactionContext(txClient, txTable2); |
| |
| // overlapping writes to the same row + column should not conflict |
| |
| txContext1.start(); |
| txTable1.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| |
| // changes should not be visible yet |
| txContext2.start(); |
| Result row = txTable2.get(new Get(TestBytes.row)); |
| assertTrue(row.isEmpty()); |
| |
| txTable2.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2)); |
| |
| // both commits should succeed |
| txContext1.finish(); |
| txContext2.finish(); |
| |
| txContext1.start(); |
| row = txTable1.get(new Get(TestBytes.row)); |
| assertFalse(row.isEmpty()); |
| assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier)); |
| txContext1.finish(); |
| |
| // transaction abort should still rollback changes |
| |
| txContext1.start(); |
| txTable1.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| txContext1.abort(); |
| |
| // changes to row2 should be rolled back |
| txContext2.start(); |
| Result row2 = txTable2.get(new Get(TestBytes.row2)); |
| assertTrue(row2.isEmpty()); |
| txContext2.finish(); |
| |
| // transaction invalidate should still make changes invisible |
| |
| txContext1.start(); |
| Transaction tx1 = txContext1.getCurrentTransaction(); |
| txTable1.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| assertNotNull(tx1); |
| txClient.invalidate(tx1.getWritePointer()); |
| |
| // changes to row2 should be rolled back |
| txContext2.start(); |
| Result row3 = txTable2.get(new Get(TestBytes.row3)); |
| assertTrue(row3.isEmpty()); |
| txContext2.finish(); |
| } |
| |
| @Test |
| public void testCheckpoint() throws Exception { |
| // start a transaction, using checkpoints between writes |
| transactionContext.start(); |
| transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| Transaction origTx = transactionContext.getCurrentTransaction(); |
| transactionContext.checkpoint(); |
| Transaction postCheckpointTx = transactionContext.getCurrentTransaction(); |
| |
| assertEquals(origTx.getTransactionId(), postCheckpointTx.getTransactionId()); |
| assertNotEquals(origTx.getWritePointer(), postCheckpointTx.getWritePointer()); |
| long[] checkpointPtrs = postCheckpointTx.getCheckpointWritePointers(); |
| assertEquals(1, checkpointPtrs.length); |
| assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs[0]); |
| |
| transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2)); |
| transactionContext.checkpoint(); |
| Transaction postCheckpointTx2 = transactionContext.getCurrentTransaction(); |
| |
| assertEquals(origTx.getTransactionId(), postCheckpointTx2.getTransactionId()); |
| assertNotEquals(postCheckpointTx.getWritePointer(), postCheckpointTx2.getWritePointer()); |
| long[] checkpointPtrs2 = postCheckpointTx2.getCheckpointWritePointers(); |
| assertEquals(2, checkpointPtrs2.length); |
| assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs2[0]); |
| assertEquals(postCheckpointTx2.getWritePointer(), checkpointPtrs2[1]); |
| |
| transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| |
| // by default, all rows should be visible with Read-Your-Writes |
| verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value); |
| verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2); |
| verifyRow(transactionAwareHTable, TestBytes.row3, TestBytes.value); |
| |
| // when disabling current write pointer, only the previous checkpoints should be visible |
| transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); |
| Get get = new Get(TestBytes.row); |
| verifyRow(transactionAwareHTable, get, TestBytes.value); |
| get = new Get(TestBytes.row2); |
| verifyRow(transactionAwareHTable, get, TestBytes.value2); |
| get = new Get(TestBytes.row3); |
| verifyRow(transactionAwareHTable, get, null); |
| |
| // test scan results excluding current write pointer |
| Scan scan = new Scan(); |
| ResultScanner scanner = transactionAwareHTable.getScanner(scan); |
| |
| Result row = scanner.next(); |
| assertNotNull(row); |
| assertArrayEquals(TestBytes.row, row.getRow()); |
| assertEquals(1, row.size()); |
| assertArrayEquals(TestBytes.value, row.getValue(TestBytes.family, TestBytes.qualifier)); |
| |
| row = scanner.next(); |
| assertNotNull(row); |
| assertArrayEquals(TestBytes.row2, row.getRow()); |
| assertEquals(1, row.size()); |
| assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier)); |
| |
| row = scanner.next(); |
| assertNull(row); |
| scanner.close(); |
| transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT); |
| |
| // commit transaction, verify writes are visible |
| transactionContext.finish(); |
| |
| transactionContext.start(); |
| verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value); |
| verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2); |
| verifyRow(transactionAwareHTable, TestBytes.row3, TestBytes.value); |
| transactionContext.finish(); |
| } |
| |
| @Test |
| public void testInProgressCheckpoint() throws Exception { |
| // start a transaction, using checkpoints between writes |
| transactionContext.start(); |
| transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| transactionContext.checkpoint(); |
| transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2)); |
| |
| // check that writes are still not visible to other clients |
| TransactionAwareHTable txTable2 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table))); |
| TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2); |
| |
| txContext2.start(); |
| verifyRow(txTable2, TestBytes.row, null); |
| verifyRow(txTable2, TestBytes.row2, null); |
| txContext2.finish(); |
| txTable2.close(); |
| |
| transactionContext.finish(); |
| |
| // verify writes are visible after commit |
| transactionContext.start(); |
| verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value); |
| verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2); |
| transactionContext.finish(); |
| } |
| |
| @Test |
| public void testCheckpointRollback() throws Exception { |
| // start a transaction, using checkpoints between writes |
| transactionContext.start(); |
| transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| transactionContext.checkpoint(); |
| transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2)); |
| transactionContext.checkpoint(); |
| transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| |
| transactionContext.abort(); |
| |
| transactionContext.start(); |
| verifyRow(transactionAwareHTable, TestBytes.row, null); |
| verifyRow(transactionAwareHTable, TestBytes.row2, null); |
| verifyRow(transactionAwareHTable, TestBytes.row3, null); |
| |
| Scan scan = new Scan(); |
| ResultScanner scanner = transactionAwareHTable.getScanner(scan); |
| assertNull(scanner.next()); |
| scanner.close(); |
| transactionContext.finish(); |
| } |
| |
| @Test |
| public void testCheckpointInvalidate() throws Exception { |
| // start a transaction, using checkpoints between writes |
| transactionContext.start(); |
| Transaction origTx = transactionContext.getCurrentTransaction(); |
| transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| transactionContext.checkpoint(); |
| Transaction checkpointTx1 = transactionContext.getCurrentTransaction(); |
| transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2)); |
| transactionContext.checkpoint(); |
| Transaction checkpointTx2 = transactionContext.getCurrentTransaction(); |
| transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| |
| TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager); |
| txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId()); |
| |
| // check that writes are not visible |
| TransactionAwareHTable txTable2 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table))); |
| TransactionContext txContext2 = new TransactionContext(txClient, txTable2); |
| txContext2.start(); |
| Transaction newTx = txContext2.getCurrentTransaction(); |
| |
| // all 3 writes pointers from the previous transaction should now be excluded |
| assertTrue(newTx.isExcluded(origTx.getWritePointer())); |
| assertTrue(newTx.isExcluded(checkpointTx1.getWritePointer())); |
| assertTrue(newTx.isExcluded(checkpointTx2.getWritePointer())); |
| |
| verifyRow(txTable2, TestBytes.row, null); |
| verifyRow(txTable2, TestBytes.row2, null); |
| verifyRow(txTable2, TestBytes.row3, null); |
| |
| Scan scan = new Scan(); |
| ResultScanner scanner = txTable2.getScanner(scan); |
| assertNull(scanner.next()); |
| scanner.close(); |
| txContext2.finish(); |
| } |
| |
| @Test |
| public void testExistingData() throws Exception { |
| byte[] val11 = Bytes.toBytes("val11"); |
| byte[] val12 = Bytes.toBytes("val12"); |
| byte[] val21 = Bytes.toBytes("val21"); |
| byte[] val22 = Bytes.toBytes("val22"); |
| byte[] val31 = Bytes.toBytes("val31"); |
| byte[] val111 = Bytes.toBytes("val111"); |
| |
| TransactionAwareHTable txTable = |
| new TransactionAwareHTable(createTable(Bytes.toBytes("testExistingData"), new byte[][]{TestBytes.family}, true, |
| Collections.singletonList(TransactionProcessor.class.getName()))); |
| TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); |
| |
| // Add some pre-existing, non-transactional data |
| Table nonTxTable = conn.getTable(TableName.valueOf(txTable.getTableName())); |
| nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val11)); |
| nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, val12)); |
| nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, val21)); |
| nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier2, val22)); |
| nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER, |
| HConstants.EMPTY_BYTE_ARRAY)); |
| nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TestBytes.qualifier, HConstants.EMPTY_BYTE_ARRAY)); |
| |
| // Add transactional data |
| txContext.start(); |
| txTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, val31)); |
| txContext.finish(); |
| |
| txContext.start(); |
| // test get |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), val11); |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), val12); |
| verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), val21); |
| verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), val22); |
| verifyRow(txTable, new Get(TestBytes.row3).addColumn(TestBytes.family, TestBytes.qualifier), val31); |
| verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER), |
| HConstants.EMPTY_BYTE_ARRAY); |
| verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TestBytes.qualifier), |
| HConstants.EMPTY_BYTE_ARRAY); |
| |
| // test scan |
| try (ResultScanner scanner = txTable.getScanner(new Scan())) { |
| Result result = scanner.next(); |
| assertNotNull(result); |
| assertArrayEquals(TestBytes.row, result.getRow()); |
| assertArrayEquals(val11, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertArrayEquals(val12, result.getValue(TestBytes.family, TestBytes.qualifier2)); |
| result = scanner.next(); |
| assertNotNull(result); |
| assertArrayEquals(TestBytes.row2, result.getRow()); |
| assertArrayEquals(val21, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertArrayEquals(val22, result.getValue(TestBytes.family, TestBytes.qualifier2)); |
| result = scanner.next(); |
| assertNotNull(result); |
| assertArrayEquals(TestBytes.row3, result.getRow()); |
| assertArrayEquals(val31, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| result = scanner.next(); |
| assertNotNull(result); |
| assertArrayEquals(TestBytes.row4, result.getRow()); |
| assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family, |
| TxConstants.FAMILY_DELETE_QUALIFIER)); |
| assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertNull(scanner.next()); |
| } |
| txContext.finish(); |
| |
| // test update and delete |
| txContext.start(); |
| txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val111)); |
| txTable.delete(new Delete(TestBytes.row2).deleteColumns(TestBytes.family, TestBytes.qualifier)); |
| txContext.finish(); |
| |
| txContext.start(); |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), val111); |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), val12); |
| verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), null); |
| verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), val22); |
| verifyRow(txTable, new Get(TestBytes.row3).addColumn(TestBytes.family, TestBytes.qualifier), val31); |
| verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER), |
| HConstants.EMPTY_BYTE_ARRAY); |
| verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TestBytes.qualifier), |
| HConstants.EMPTY_BYTE_ARRAY); |
| txContext.finish(); |
| |
| // test scan |
| txContext.start(); |
| try (ResultScanner scanner = txTable.getScanner(new Scan())) { |
| Result result = scanner.next(); |
| assertNotNull(result); |
| assertArrayEquals(TestBytes.row, result.getRow()); |
| assertArrayEquals(val111, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertArrayEquals(val12, result.getValue(TestBytes.family, TestBytes.qualifier2)); |
| result = scanner.next(); |
| assertNotNull(result); |
| assertArrayEquals(TestBytes.row2, result.getRow()); |
| assertArrayEquals(null, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertArrayEquals(val22, result.getValue(TestBytes.family, TestBytes.qualifier2)); |
| result = scanner.next(); |
| assertNotNull(result); |
| assertArrayEquals(TestBytes.row3, result.getRow()); |
| assertArrayEquals(val31, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| result = scanner.next(); |
| assertNotNull(result); |
| assertArrayEquals(TestBytes.row4, result.getRow()); |
| assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family, |
| TxConstants.FAMILY_DELETE_QUALIFIER)); |
| assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| assertNull(scanner.next()); |
| } |
| txContext.finish(); |
| } |
| |
| private void verifyRow(Table table, byte[] rowkey, byte[] expectedValue) throws Exception { |
| verifyRow(table, new Get(rowkey), expectedValue); |
| } |
| |
| private void verifyRow(Table table, Get get, byte[] expectedValue) throws Exception { |
| verifyRows(table, get, expectedValue == null ? null : ImmutableList.of(expectedValue)); |
| } |
| |
| private void verifyRows(Table table, Get get, List<byte[]> expectedValues) throws Exception { |
| Result result = table.get(get); |
| if (expectedValues == null) { |
| assertTrue(result.isEmpty()); |
| } else { |
| assertFalse(result.isEmpty()); |
| byte[] family = TestBytes.family; |
| byte[] col = TestBytes.qualifier; |
| if (get.hasFamilies()) { |
| family = get.getFamilyMap().keySet().iterator().next(); |
| col = get.getFamilyMap().get(family).first(); |
| } |
| Iterator<Cell> it = result.getColumnCells(family, col).iterator(); |
| for (byte[] expectedValue : expectedValues) { |
| Assert.assertTrue(it.hasNext()); |
| assertArrayEquals(expectedValue, CellUtil.cloneValue(it.next())); |
| } |
| } |
| } |
| |
| private Cell[] getRow(Table table, Get get) throws Exception { |
| Result result = table.get(get); |
| return result.rawCells(); |
| } |
| |
| private void verifyScan(Table table, Scan scan, List<KeyValue> expectedCells) throws Exception { |
| List<Cell> actualCells = new ArrayList<>(); |
| try (ResultScanner scanner = table.getScanner(scan)) { |
| Result[] results = scanner.next(expectedCells.size() + 1); |
| for (Result result : results) { |
| actualCells.addAll(Lists.newArrayList(result.rawCells())); |
| } |
| Assert.assertEquals(expectedCells, actualCells); |
| } |
| } |
| |
| @Test |
| public void testVisibilityAll() throws Exception { |
| Table nonTxTable = |
| createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2}, |
| true, Collections.singletonList(TransactionProcessor.class.getName())); |
| TransactionAwareHTable txTable = |
| new TransactionAwareHTable(nonTxTable, |
| TxConstants.ConflictDetection.ROW); // ROW conflict detection to verify family deletes |
| TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); |
| |
| // start a transaction and create a delete marker |
| txContext.start(); |
| //noinspection ConstantConditions |
| long txWp0 = txContext.getCurrentTransaction().getWritePointer(); |
| txTable.delete(new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier2)); |
| txContext.finish(); |
| |
| // start a new transaction and write some values |
| txContext.start(); |
| @SuppressWarnings("ConstantConditions") |
| long txWp1 = txContext.getCurrentTransaction().getWritePointer(); |
| txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)); |
| txTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value)); |
| txTable.put(new Put(TestBytes.row).add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)); |
| txTable.put(new Put(TestBytes.row).add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2)); |
| |
| // verify written data |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), |
| TestBytes.value); |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), |
| TestBytes.value2); |
| verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), |
| TestBytes.value); |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier), |
| TestBytes.value); |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2), |
| TestBytes.value2); |
| |
| // checkpoint and make changes to written data now |
| txContext.checkpoint(); |
| long txWp2 = txContext.getCurrentTransaction().getWritePointer(); |
| // delete a column |
| txTable.delete(new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier)); |
| // no change to a column |
| txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)); |
| // update a column |
| txTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value3)); |
| // delete column family |
| txTable.delete(new Delete(TestBytes.row).deleteFamily(TestBytes.family2)); |
| |
| // verify changed values |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), |
| null); |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), |
| TestBytes.value2); |
| verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), |
| TestBytes.value3); |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier), |
| null); |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2), |
| null); |
| |
| // run a scan with VisibilityLevel.ALL, this should return all raw changes by this transaction, |
| // and the raw change by prior transaction |
| //noinspection ConstantConditions |
| txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL); |
| List<KeyValue> expected = ImmutableList.of( |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn), |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value), |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2), |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp1, TestBytes.value2), |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp0, KeyValue.Type.DeleteColumn), |
| new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily), |
| new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier, txWp1, TestBytes.value), |
| new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier2, txWp1, TestBytes.value2), |
| new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3), |
| new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value) |
| ); |
| verifyScan(txTable, new Scan(), expected); |
| |
| // verify a Get is also able to return all snapshot versions |
| Get get = new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier); |
| Cell[] cells = getRow(txTable, get); |
| Assert.assertEquals(2, cells.length); |
| Assert.assertTrue(CellUtil.isDelete(cells[0])); |
| Assert.assertArrayEquals(TestBytes.value, CellUtil.cloneValue(cells[1])); |
| |
| get = new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2); |
| cells = getRow(txTable, get); |
| Assert.assertEquals(3, cells.length); |
| Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[0])); |
| Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[1])); |
| Assert.assertTrue(CellUtil.isDeleteColumns(cells[2])); |
| |
| verifyRows(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), |
| ImmutableList.of(TestBytes.value3, TestBytes.value)); |
| |
| get = new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier); |
| cells = getRow(txTable, get); |
| Assert.assertEquals(2, cells.length); |
| Assert.assertTrue(CellUtil.isDelete(cells[0])); |
| Assert.assertArrayEquals(TestBytes.value, CellUtil.cloneValue(cells[1])); |
| |
| get = new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2); |
| cells = getRow(txTable, get); |
| Assert.assertEquals(2, cells.length); |
| Assert.assertTrue(CellUtil.isDelete(cells[0])); |
| Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[1])); |
| |
| // Verify VisibilityLevel.SNAPSHOT |
| txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT); |
| expected = ImmutableList.of( |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2), |
| new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3) |
| ); |
| verifyScan(txTable, new Scan(), expected); |
| |
| // Verify VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT |
| txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); |
| expected = ImmutableList.of( |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value), |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp1, TestBytes.value2), |
| new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier, txWp1, TestBytes.value), |
| new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier2, txWp1, TestBytes.value2), |
| new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value) |
| ); |
| verifyScan(txTable, new Scan(), expected); |
| txContext.finish(); |
| |
| // finally verify values once more after commit, this time we should get only committed raw values for |
| // all visibility levels |
| txContext.start(); |
| txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL); |
| expected = ImmutableList.of( |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn), |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2), |
| new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily), |
| new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3) |
| ); |
| verifyScan(txTable, new Scan(), expected); |
| |
| txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT); |
| expected = ImmutableList.of( |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2), |
| new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3) |
| ); |
| verifyScan(txTable, new Scan(), expected); |
| |
| txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); |
| expected = ImmutableList.of( |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2), |
| new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3) |
| ); |
| verifyScan(txTable, new Scan(), expected); |
| |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), |
| null); |
| verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), |
| TestBytes.value2); |
| verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), |
| TestBytes.value3); |
| txContext.finish(); |
| |
| // Test with regular HBase deletes in pre-existing data |
| long now = System.currentTimeMillis(); |
| Delete deleteColumn = new Delete(TestBytes.row3).deleteColumn(TestBytes.family, TestBytes.qualifier, now - 1); |
| // to prevent Tephra from replacing delete with delete marker |
| deleteColumn.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); |
| nonTxTable.delete(deleteColumn); |
| Delete deleteFamily = new Delete(TestBytes.row3).deleteFamily(TestBytes.family2, now); |
| // to prevent Tephra from replacing delete with delete marker |
| deleteFamily.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); |
| nonTxTable.delete(deleteFamily); |
| |
| txContext.start(); |
| txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL); |
| expected = ImmutableList.of( |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn), |
| new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2), |
| new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily), |
| new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3), |
| new KeyValue(TestBytes.row3, TestBytes.family, TestBytes.qualifier, now - 1, KeyValue.Type.Delete), |
| new KeyValue(TestBytes.row3, TestBytes.family2, null, now, KeyValue.Type.DeleteFamily) |
| ); |
| // test scan |
| Scan scan = new Scan(); |
| scan.setRaw(true); |
| verifyScan(txTable, scan, expected); |
| txContext.finish(); |
| } |
| |
| @Test |
| public void testFilters() throws Exception { |
| // Add some values to table |
| transactionContext.start(); |
| Put put = new Put(TestBytes.row); |
| byte[] val1 = Bytes.toBytes(1L); |
| put.add(TestBytes.family, TestBytes.qualifier, val1); |
| transactionAwareHTable.put(put); |
| put = new Put(TestBytes.row2); |
| byte[] val2 = Bytes.toBytes(2L); |
| put.add(TestBytes.family, TestBytes.qualifier, val2); |
| transactionAwareHTable.put(put); |
| put = new Put(TestBytes.row3); |
| byte[] val3 = Bytes.toBytes(3L); |
| put.add(TestBytes.family, TestBytes.qualifier, val3); |
| transactionAwareHTable.put(put); |
| put = new Put(TestBytes.row4); |
| byte[] val4 = Bytes.toBytes(4L); |
| put.add(TestBytes.family, TestBytes.qualifier, val4); |
| transactionAwareHTable.put(put); |
| transactionContext.finish(); |
| |
| // Delete cell with value 2 |
| transactionContext.start(); |
| Delete delete = new Delete(TestBytes.row2); |
| delete.addColumn(TestBytes.family, TestBytes.qualifier); |
| transactionAwareHTable.delete(delete); |
| transactionContext.finish(); |
| |
| // Scan for values less than 4, should get only values 1 and 3 |
| transactionContext.start(); |
| Scan scan = new Scan(TestBytes.row, new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(4))); |
| try (ResultScanner scanner = transactionAwareHTable.getScanner(scan)) { |
| Result result = scanner.next(); |
| assertNotNull(result); |
| assertArrayEquals(TestBytes.row, result.getRow()); |
| assertArrayEquals(val1, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| result = scanner.next(); |
| assertNotNull(result); |
| assertArrayEquals(TestBytes.row3, result.getRow()); |
| assertArrayEquals(val3, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| result = scanner.next(); |
| assertNull(result); |
| } |
| transactionContext.finish(); |
| |
| // Run a Get with a filter for less than 10 on row4, should get value 4 |
| transactionContext.start(); |
| Get get = new Get(TestBytes.row4); |
| get.setFilter(new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(10))); |
| Result result = transactionAwareHTable.get(get); |
| assertFalse(result.isEmpty()); |
| assertArrayEquals(val4, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| transactionContext.finish(); |
| |
| // Change value of row4 to 40 |
| transactionContext.start(); |
| put = new Put(TestBytes.row4); |
| byte[] val40 = Bytes.toBytes(40L); |
| put.add(TestBytes.family, TestBytes.qualifier, val40); |
| transactionAwareHTable.put(put); |
| transactionContext.finish(); |
| |
| // Scan for values less than 10, should get only values 1 and 3 |
| transactionContext.start(); |
| scan = new Scan(TestBytes.row, new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(10))); |
| try (ResultScanner scanner = transactionAwareHTable.getScanner(scan)) { |
| result = scanner.next(); |
| assertNotNull(result); |
| assertArrayEquals(TestBytes.row, result.getRow()); |
| assertArrayEquals(val1, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| result = scanner.next(); |
| assertNotNull(result); |
| assertArrayEquals(TestBytes.row3, result.getRow()); |
| assertArrayEquals(val3, result.getValue(TestBytes.family, TestBytes.qualifier)); |
| result = scanner.next(); |
| assertNull(result); |
| } |
| transactionContext.finish(); |
| |
| // Run the Get again with a filter for less than 10 on row4, this time should not get any results |
| transactionContext.start(); |
| result = transactionAwareHTable.get(get); |
| assertTrue(result.isEmpty()); |
| transactionContext.finish(); |
| } |
| |
| @Test |
| public void testTxLifetime() throws Exception { |
| // Add some initial values |
| transactionContext.start(); |
| Put put = new Put(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier, TestBytes.value); |
| transactionAwareHTable.put(put); |
| put = new Put(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2, TestBytes.value); |
| transactionAwareHTable.put(put); |
| transactionContext.finish(); |
| |
| // Simulate writing with a transaction past its max lifetime |
| transactionContext.start(); |
| Transaction currentTx = transactionContext.getCurrentTransaction(); |
| Assert.assertNotNull(currentTx); |
| |
| // Create a transaction that is past the max lifetime |
| long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, |
| TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); |
| long oldTxId = currentTx.getTransactionId() - ((txMaxLifetimeMillis + 10000) * TxConstants.MAX_TX_PER_MS); |
| Transaction oldTx = new Transaction(currentTx.getReadPointer(), oldTxId, |
| currentTx.getInvalids(), currentTx.getInProgress(), |
| currentTx.getFirstShortInProgress()); |
| transactionAwareHTable.updateTx(oldTx); |
| // Put with the old transaction should fail |
| put = new Put(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier, TestBytes.value); |
| try { |
| transactionAwareHTable.put(put); |
| Assert.fail("Excepted exception with old transaction!"); |
| } catch (IOException e) { |
| // Expected exception |
| } |
| |
| // Delete with the old transaction should also fail |
| Delete delete = new Delete(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier); |
| try { |
| transactionAwareHTable.delete(delete); |
| Assert.fail("Excepted exception with old transaction!"); |
| } catch (IOException e) { |
| // Expected exception |
| } |
| |
| // Now update the table to use the current transaction |
| transactionAwareHTable.updateTx(currentTx); |
| put = new Put(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2, TestBytes.value); |
| transactionAwareHTable.put(put); |
| delete = new Delete(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2); |
| transactionAwareHTable.delete(delete); |
| |
| // Verify values with the same transaction since we cannot commit the old transaction |
| verifyRow(transactionAwareHTable, |
| new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), TestBytes.value); |
| verifyRow(transactionAwareHTable, |
| new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), null); |
| verifyRow(transactionAwareHTable, |
| new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), null); |
| verifyRow(transactionAwareHTable, |
| new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), TestBytes.value); |
| transactionContext.finish(); |
| } |
| |
| /** |
| * Tests that transaction co-processor works with older clients |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testOlderClientOperations() throws Exception { |
| // Use old HTable to test |
| TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable); |
| transactionContext.addTransactionAware(oldTxAware); |
| |
| transactionContext.start(); |
| Put put = new Put(TestBytes.row); |
| put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); |
| oldTxAware.put(put); |
| transactionContext.finish(); |
| |
| transactionContext.start(); |
| long txId = transactionContext.getCurrentTransaction().getTransactionId(); |
| put = new Put(TestBytes.row); |
| put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2); |
| oldTxAware.put(put); |
| // Invalidate the second Put |
| TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager); |
| txClient.invalidate(txId); |
| |
| transactionContext.start(); |
| put = new Put(TestBytes.row); |
| put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3); |
| oldTxAware.put(put); |
| // Abort the third Put |
| transactionContext.abort(); |
| |
| // Get should now return the first value |
| transactionContext.start(); |
| Result result = oldTxAware.get(new Get(TestBytes.row)); |
| transactionContext.finish(); |
| |
| byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier); |
| assertArrayEquals(TestBytes.value, value); |
| } |
| |
| /** |
| * Represents older transaction clients |
| */ |
| private static class OldTransactionAwareHTable extends TransactionAwareHTable { |
| public OldTransactionAwareHTable(Table hTable) { |
| super(hTable); |
| } |
| |
| @Override |
| public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { |
| op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); |
| } |
| |
| @Override |
| protected void makeRollbackOperation(Delete delete) { |
| delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); |
| } |
| } |
| } |