| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.fluo.integration.impl; |
| |
| import java.util.Iterator; |
| import java.util.Map.Entry; |
| |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.client.admin.CompactionConfig; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.fluo.accumulo.format.FluoFormatter; |
| import org.apache.fluo.accumulo.util.ColumnConstants; |
| import org.apache.fluo.accumulo.util.ColumnType; |
| import org.apache.fluo.accumulo.util.LongUtil; |
| import org.apache.fluo.accumulo.util.ZookeeperUtil; |
| import org.apache.fluo.accumulo.values.DelLockValue; |
| import org.apache.fluo.api.client.TransactionBase; |
| import org.apache.fluo.api.data.Bytes; |
| import org.apache.fluo.api.data.Column; |
| import org.apache.fluo.api.data.RowColumn; |
| import org.apache.fluo.api.exceptions.CommitException; |
| import org.apache.fluo.api.exceptions.FluoException; |
| import org.apache.fluo.api.observer.Observer; |
| import org.apache.fluo.api.observer.ObserverProvider; |
| import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException; |
| import org.apache.fluo.core.exceptions.StaleScanException; |
| import org.apache.fluo.core.impl.Notification; |
| import org.apache.fluo.core.impl.TransactionImpl; |
| import org.apache.fluo.core.impl.TransactionImpl.CommitData; |
| import org.apache.fluo.core.impl.TransactorNode; |
| import org.apache.fluo.core.oracle.Stamp; |
| import org.apache.fluo.integration.BankUtil; |
| import org.apache.fluo.integration.ITBaseImpl; |
| import org.apache.fluo.integration.TestTransaction; |
| import org.apache.fluo.integration.TestUtil; |
| import org.junit.Assert; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.Timeout; |
| |
| import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG; |
| import static org.apache.fluo.integration.BankUtil.BALANCE; |
| |
| public class FailureIT extends ITBaseImpl { |
| @Rule |
| public Timeout globalTimeout = Timeout.seconds(getTestTimeout()); |
| |
| public static class NullObserver implements Observer { |
| @Override |
| public void process(TransactionBase tx, Bytes row, Column col) throws Exception {} |
| } |
| |
| public static class FailuresObserverProvider implements ObserverProvider { |
| @Override |
| public void provide(Registry or, Context ctx) { |
| or.forColumn(new Column("attr", "lastupdate"), STRONG).useObserver(new NullObserver()); |
| } |
| } |
| |
| @Override |
| protected Class<? extends ObserverProvider> getObserverProviderClass() { |
| return FailuresObserverProvider.class; |
| } |
| |
| @Test |
| public void testRollbackMany() throws Exception { |
| testRollbackManyImpl(true); |
| } |
| |
| @Test |
| public void testRollbackManyTimeout() throws Exception { |
| testRollbackManyImpl(false); |
| } |
| |
| private void testRollbackManyImpl(boolean killTransactor) throws Exception { |
| |
| // test writing lots of columns that need to be rolled back |
| |
| Column col1 = new Column("fam1", "q1"); |
| Column col2 = new Column("fam1", "q2"); |
| |
| TestTransaction tx = new TestTransaction(env); |
| |
| for (int r = 0; r < 10; r++) { |
| String row = Integer.toString(r); |
| tx.set(row, col1, "0" + r + "0"); |
| tx.set(row, col2, "0" + r + "1"); |
| } |
| |
| tx.done(); |
| |
| TransactorNode t2 = new TransactorNode(env); |
| TestTransaction tx2 = new TestTransaction(env, t2); |
| |
| for (int r = 0; r < 10; r++) { |
| tx2.set(r + "", col1, "1" + r + "0"); |
| tx2.set(r + "", col2, "1" + r + "1"); |
| } |
| |
| CommitData cd = tx2.createCommitData(); |
| Assert.assertTrue(tx2.preCommit(cd)); |
| |
| if (killTransactor) { |
| t2.close(); |
| } |
| |
| TestTransaction tx3 = new TestTransaction(env); |
| for (int r = 0; r < 10; r++) { |
| Assert.assertEquals("0" + r + "0", tx3.gets(r + "", col1)); |
| Assert.assertEquals("0" + r + "1", tx3.gets(r + "", col2)); |
| } |
| |
| if (killTransactor) { |
| Stamp commitTs = env.getSharedResources().getOracleClient().getStamp(); |
| Assert.assertThrows(FluoException.class, () -> tx2.commitPrimaryColumn(cd, commitTs)); |
| } else { |
| Stamp commitTs = env.getSharedResources().getOracleClient().getStamp(); |
| Assert.assertFalse(tx2.commitPrimaryColumn(cd, commitTs)); |
| t2.close(); |
| } |
| |
| TestTransaction tx4 = new TestTransaction(env); |
| for (int r = 0; r < 10; r++) { |
| Assert.assertEquals("0" + r + "0", tx4.gets(r + "", col1)); |
| Assert.assertEquals("0" + r + "1", tx4.gets(r + "", col2)); |
| } |
| } |
| |
| @Test |
| public void testRollforwardMany() throws Exception { |
| testRollforwardManyImpl(true); |
| } |
| |
| @Test |
| public void testRollforwardManyTimeout() throws Exception { |
| testRollforwardManyImpl(false); |
| } |
| |
| private void testRollforwardManyImpl(boolean killTransactor) throws Exception { |
| // test writing lots of columns that need to be rolled forward |
| |
| Column col1 = new Column("fam1", "q1"); |
| Column col2 = new Column("fam1", "q2"); |
| |
| TestTransaction tx = new TestTransaction(env); |
| |
| for (int r = 0; r < 10; r++) { |
| tx.set(r + "", col1, "0" + r + "0"); |
| tx.set(r + "", col2, "0" + r + "1"); |
| } |
| |
| tx.done(); |
| |
| TransactorNode t2 = new TransactorNode(env); |
| TestTransaction tx2 = new TestTransaction(env, t2); |
| |
| for (int r = 0; r < 10; r++) { |
| tx2.set(r + "", col1, "1" + r + "0"); |
| tx2.set(r + "", col2, "1" + r + "1"); |
| } |
| |
| CommitData cd = tx2.createCommitData(); |
| Assert.assertTrue(tx2.preCommit(cd)); |
| Stamp commitTs = env.getSharedResources().getOracleClient().getStamp(); |
| Assert.assertTrue(tx2.commitPrimaryColumn(cd, commitTs)); |
| |
| if (killTransactor) { |
| t2.close(); |
| } |
| |
| TestTransaction tx3 = new TestTransaction(env); |
| for (int r = 0; r < 10; r++) { |
| Assert.assertEquals("1" + r + "0", tx3.gets(r + "", col1)); |
| Assert.assertEquals("1" + r + "1", tx3.gets(r + "", col2)); |
| } |
| |
| tx2.finishCommit(cd, commitTs); |
| |
| TestTransaction tx4 = new TestTransaction(env); |
| for (int r = 0; r < 10; r++) { |
| Assert.assertEquals("1" + r + "0", tx4.gets(r + "", col1)); |
| Assert.assertEquals("1" + r + "1", tx4.gets(r + "", col2)); |
| } |
| |
| if (!killTransactor) { |
| t2.close(); |
| } |
| } |
| |
| @Test |
| public void testRollback() throws Exception { |
| // test the case where a scan encounters a stuck lock and rolls it back |
| |
| TestTransaction tx = new TestTransaction(env); |
| |
| tx.set("bob", BALANCE, "10"); |
| tx.set("joe", BALANCE, "20"); |
| tx.set("jill", BALANCE, "60"); |
| |
| tx.done(); |
| |
| TestTransaction tx2 = new TestTransaction(env); |
| |
| int bal1 = Integer.parseInt(tx2.gets("bob", BALANCE)); |
| int bal2 = Integer.parseInt(tx2.gets("joe", BALANCE)); |
| |
| tx2.set("bob", BALANCE, (bal1 - 7) + ""); |
| tx2.set("joe", BALANCE, (bal2 + 7) + ""); |
| |
| // get locks |
| CommitData cd = tx2.createCommitData(); |
| Assert.assertTrue(tx2.preCommit(cd)); |
| |
| // test rolling back primary and non-primary columns |
| |
| int bobBal = 10; |
| int joeBal = 20; |
| if (Math.random() < .5) { |
| BankUtil.transfer(env, "joe", "jill", 7); |
| joeBal -= 7; |
| } else { |
| BankUtil.transfer(env, "bob", "jill", 7); |
| bobBal -= 7; |
| } |
| |
| TestTransaction tx4 = new TestTransaction(env); |
| |
| Assert.assertEquals(bobBal + "", tx4.gets("bob", BALANCE)); |
| Assert.assertEquals(joeBal + "", tx4.gets("joe", BALANCE)); |
| Assert.assertEquals("67", tx4.gets("jill", BALANCE)); |
| |
| Stamp commitTs = env.getSharedResources().getOracleClient().getStamp(); |
| Assert.assertFalse(tx2.commitPrimaryColumn(cd, commitTs)); |
| |
| BankUtil.transfer(env, "bob", "joe", 2); |
| bobBal -= 2; |
| joeBal += 2; |
| |
| TestTransaction tx6 = new TestTransaction(env); |
| |
| Assert.assertEquals(bobBal + "", tx6.gets("bob", BALANCE)); |
| Assert.assertEquals(joeBal + "", tx6.gets("joe", BALANCE)); |
| Assert.assertEquals("67", tx6.gets("jill", BALANCE)); |
| } |
| |
| @Test |
| public void testDeadRollback() throws Exception { |
| rollbackTest(true); |
| } |
| |
| @Test |
| public void testTimeoutRollback() throws Exception { |
| rollbackTest(false); |
| } |
| |
| private void rollbackTest(boolean killTransactor) throws Exception { |
| TransactorNode t1 = new TransactorNode(env); |
| |
| TestTransaction tx = new TestTransaction(env); |
| |
| tx.set("bob", BALANCE, "10"); |
| tx.set("joe", BALANCE, "20"); |
| tx.set("jill", BALANCE, "60"); |
| |
| tx.done(); |
| |
| TestTransaction tx2 = new TestTransaction(env, t1); |
| |
| int bal1 = Integer.parseInt(tx2.gets("bob", BALANCE)); |
| int bal2 = Integer.parseInt(tx2.gets("joe", BALANCE)); |
| |
| tx2.set("bob", BALANCE, (bal1 - 7) + ""); |
| tx2.set("joe", BALANCE, (bal2 + 7) + ""); |
| |
| CommitData cd = tx2.createCommitData(); |
| Assert.assertTrue(tx2.preCommit(cd)); |
| |
| if (killTransactor) { |
| t1.close(); |
| } |
| |
| TransactionImpl tx3 = new TransactionImpl(env); |
| Assert.assertEquals(0, tx3.getStats().getDeadLocks()); |
| Assert.assertEquals(0, tx3.getStats().getTimedOutLocks()); |
| |
| int bobFinal = Integer.parseInt(tx3.get(Bytes.of("bob"), BALANCE).toString()); |
| Assert.assertEquals(10, bobFinal); |
| |
| long startTs = tx2.getStartTimestamp(); |
| // one and only one of the rolled back locks should be marked as primary |
| Assert.assertTrue(wasRolledBackPrimary(startTs, "bob") ^ wasRolledBackPrimary(startTs, "joe")); |
| |
| if (killTransactor) { |
| Assert.assertEquals(1, tx3.getStats().getDeadLocks()); |
| Assert.assertEquals(0, tx3.getStats().getTimedOutLocks()); |
| } else { |
| Assert.assertEquals(0, tx3.getStats().getDeadLocks()); |
| Assert.assertEquals(1, tx3.getStats().getTimedOutLocks()); |
| } |
| |
| Stamp commitTs = env.getSharedResources().getOracleClient().getStamp(); |
| |
| if (killTransactor) { |
| Assert.assertThrows(FluoException.class, () -> tx2.commitPrimaryColumn(cd, commitTs)); |
| } else { |
| Assert.assertFalse(tx2.commitPrimaryColumn(cd, commitTs)); |
| t1.close(); |
| } |
| tx3.close(); |
| } |
| |
| @Test |
| public void testRollfoward() throws Exception { |
| // test the case where a scan encounters a stuck lock (for a complete tx) and rolls it forward |
| |
| TestTransaction tx = new TestTransaction(env); |
| |
| tx.set("bob", BALANCE, "10"); |
| tx.set("joe", BALANCE, "20"); |
| tx.set("jill", BALANCE, "60"); |
| |
| tx.done(); |
| |
| TestTransaction tx2 = new TestTransaction(env); |
| |
| int bal1 = Integer.parseInt(tx2.gets("bob", BALANCE)); |
| int bal2 = Integer.parseInt(tx2.gets("joe", BALANCE)); |
| |
| tx2.set("bob", BALANCE, (bal1 - 7) + ""); |
| tx2.set("joe", BALANCE, (bal2 + 7) + ""); |
| |
| // get locks |
| CommitData cd = tx2.createCommitData(); |
| Assert.assertTrue(tx2.preCommit(cd)); |
| Stamp commitTs = env.getSharedResources().getOracleClient().getStamp(); |
| Assert.assertTrue(tx2.commitPrimaryColumn(cd, commitTs)); |
| |
| // test rolling forward primary and non-primary columns |
| int bobBal = 3; |
| int joeBal = 27; |
| |
| if (Math.random() < .5) { |
| BankUtil.transfer(env, "joe", "jill", 2); |
| joeBal = 25; |
| } else { |
| BankUtil.transfer(env, "bob", "jill", 2); |
| bobBal = 1; |
| } |
| |
| TestTransaction tx4 = new TestTransaction(env); |
| |
| Assert.assertEquals(bobBal + "", tx4.gets("bob", BALANCE)); |
| Assert.assertEquals(joeBal + "", tx4.gets("joe", BALANCE)); |
| Assert.assertEquals("62", tx4.gets("jill", BALANCE)); |
| |
| tx2.finishCommit(cd, commitTs); |
| |
| TestTransaction tx5 = new TestTransaction(env); |
| |
| Assert.assertEquals(bobBal + "", tx5.gets("bob", BALANCE)); |
| Assert.assertEquals(joeBal + "", tx5.gets("joe", BALANCE)); |
| Assert.assertEquals("62", tx5.gets("jill", BALANCE)); |
| } |
| |
| @Test |
| public void testAcks() throws Exception { |
| // TODO test that acks are properly handled in rollback and rollforward |
| |
| TestTransaction tx = new TestTransaction(env); |
| |
| final Column lastUpdate = new Column("attr", "lastupdate"); |
| final Column docContent = new Column("doc", "content"); |
| final Column docUrl = new Column("doc", "url"); |
| |
| tx.set("url0000", lastUpdate, "3"); |
| tx.set("url0000", docContent, "abc def"); |
| |
| tx.done(); |
| |
| TestTransaction tx2 = new TestTransaction(env, "url0000", lastUpdate); |
| tx2.set("idx:abc", docUrl, "url0000"); |
| tx2.set("idx:def", docUrl, "url0000"); |
| CommitData cd = tx2.createCommitData(); |
| tx2.preCommit(cd); |
| |
| TestTransaction tx3 = new TestTransaction(env); |
| Assert.assertNull(tx3.gets("idx:abc", docUrl)); |
| Assert.assertNull(tx3.gets("idx:def", docUrl)); |
| Assert.assertEquals("3", tx3.gets("url0000", lastUpdate)); |
| |
| Scanner scanner = env.getAccumuloClient().createScanner(env.getTable(), Authorizations.EMPTY); |
| Notification.configureScanner(scanner); |
| Iterator<Entry<Key, Value>> iter = scanner.iterator(); |
| Assert.assertTrue(iter.hasNext()); |
| Assert.assertEquals("url0000", iter.next().getKey().getRow().toString()); |
| |
| TestTransaction tx5 = new TestTransaction(env, "url0000", lastUpdate); |
| tx5.set("idx:abc", docUrl, "url0000"); |
| tx5.set("idx:def", docUrl, "url0000"); |
| cd = tx5.createCommitData(); |
| Assert.assertTrue(tx5.preCommit(cd)); |
| Stamp commitTs = env.getSharedResources().getOracleClient().getStamp(); |
| Assert.assertTrue(tx5.commitPrimaryColumn(cd, commitTs)); |
| |
| // should roll tx5 forward |
| TestTransaction tx6 = new TestTransaction(env); |
| Assert.assertEquals("3", tx6.gets("url0000", lastUpdate)); |
| Assert.assertEquals("url0000", tx6.gets("idx:abc", docUrl)); |
| Assert.assertEquals("url0000", tx6.gets("idx:def", docUrl)); |
| |
| iter = scanner.iterator(); |
| Assert.assertTrue(iter.hasNext()); |
| |
| // TODO is tx4 start before tx5, then this test will not work because AlreadyAck is not thrown |
| // for overlapping.. CommitException is thrown |
| TestTransaction tx4 = new TestTransaction(env, "url0000", lastUpdate); |
| tx4.set("idx:abc", docUrl, "url0000"); |
| tx4.set("idx:def", docUrl, "url0000"); |
| |
| try { |
| // should not go through if tx5 is properly rolled forward |
| tx4.commit(); |
| Assert.fail(); |
| } catch (AlreadyAcknowledgedException aae) { |
| // do nothing |
| } |
| |
| // commit above should schedule async delete of notification |
| env.getSharedResources().getBatchWriter().waitForAsyncFlush(); |
| iter = scanner.iterator(); |
| Assert.assertFalse(iter.hasNext()); |
| } |
| |
| @Test |
| public void testStaleScanPrevention() throws Exception { |
| |
| TestTransaction tx = new TestTransaction(env); |
| |
| tx.set("bob", BALANCE, "10"); |
| tx.set("joe", BALANCE, "20"); |
| tx.set("jill", BALANCE, "60"); |
| |
| tx.done(); |
| |
| TestTransaction tx2 = new TestTransaction(env); |
| Assert.assertEquals("10", tx2.gets("bob", BALANCE)); |
| |
| BankUtil.transfer(env, "joe", "jill", 1); |
| BankUtil.transfer(env, "joe", "bob", 1); |
| BankUtil.transfer(env, "bob", "joe", 2); |
| BankUtil.transfer(env, "jill", "joe", 2); |
| |
| aClient.tableOperations().flush(table, null, null, true); |
| |
| Assert.assertEquals("20", tx2.gets("joe", BALANCE)); |
| |
| // Stale scan should not occur due to oldest active timestamp tracking in Zookeeper |
| tx2.close(); |
| |
| TestTransaction tx3 = new TestTransaction(env); |
| |
| Assert.assertEquals("9", tx3.gets("bob", BALANCE)); |
| Assert.assertEquals("22", tx3.gets("joe", BALANCE)); |
| Assert.assertEquals("59", tx3.gets("jill", BALANCE)); |
| } |
| |
| @Test(timeout = 60000) |
| public void testForcedStaleScan() throws Exception { |
| |
| TestTransaction tx = new TestTransaction(env); |
| |
| tx.set("bob", BALANCE, "10"); |
| tx.set("joe", BALANCE, "20"); |
| tx.set("jill", BALANCE, "60"); |
| tx.set("john", BALANCE, "3"); |
| |
| tx.done(); |
| |
| TestTransaction tx2 = new TestTransaction(env); |
| Assert.assertEquals("10", tx2.gets("bob", BALANCE)); |
| |
| TestTransaction tx3 = new TestTransaction(env); |
| tx3.gets("john", BALANCE); |
| |
| BankUtil.transfer(env, "joe", "jill", 1); |
| BankUtil.transfer(env, "joe", "bob", 1); |
| BankUtil.transfer(env, "bob", "joe", 2); |
| BankUtil.transfer(env, "jill", "joe", 2); |
| |
| // Force a stale scan be modifying the oldest active timestamp to a |
| // more recent time in Zookeeper. This disables timestamp tracking. |
| Long nextTs = new TestTransaction(env).getStartTs(); |
| CuratorFramework curator = env.getSharedResources().getCurator(); |
| curator.setData().forPath(env.getSharedResources().getTimestampTracker().getNodePath(), |
| LongUtil.toByteArray(nextTs)); |
| |
| long gcTs = ZookeeperUtil.getGcTimestamp(config.getAppZookeepers()); |
| while (gcTs < nextTs) { |
| Thread.sleep(500); |
| // keep setting timestamp tracker time in ZK until GC picks it up |
| curator.setData().forPath(env.getSharedResources().getTimestampTracker().getNodePath(), |
| LongUtil.toByteArray(nextTs)); |
| gcTs = ZookeeperUtil.getGcTimestamp(config.getAppZookeepers()); |
| } |
| |
| // GC iterator will clear data that tx2 wants to scan |
| aClient.tableOperations().flush(table, null, null, true); |
| |
| // this data should have been GCed, but the problem is not detected here |
| Assert.assertNull(tx2.gets("joe", BALANCE)); |
| |
| try { |
| // closing should detect the stale scan |
| tx2.close(); |
| Assert.fail(); |
| } catch (StaleScanException sse) { |
| // do nothing |
| } |
| |
| tx3.set("john", BALANCE, "5"); |
| |
| try { |
| tx3.commit(); |
| Assert.fail(); |
| } catch (CommitException e) { |
| // should not throw an exception |
| tx3.close(); |
| } |
| |
| TestTransaction tx4 = new TestTransaction(env); |
| |
| Assert.assertEquals("9", tx4.gets("bob", BALANCE)); |
| Assert.assertEquals("22", tx4.gets("joe", BALANCE)); |
| Assert.assertEquals("59", tx4.gets("jill", BALANCE)); |
| Assert.assertEquals("3", tx4.gets("john", BALANCE)); |
| } |
| |
| @Test |
| public void testCommitBug1() throws Exception { |
| |
| TestTransaction tx1 = new TestTransaction(env); |
| |
| tx1.set("bob", BALANCE, "10"); |
| tx1.set("joe", BALANCE, "20"); |
| tx1.set("jill", BALANCE, "60"); |
| |
| CommitData cd = tx1.createCommitData(); |
| Assert.assertTrue(tx1.preCommit(cd)); |
| |
| while (true) { |
| TestTransaction tx2 = new TestTransaction(env); |
| |
| tx2.set("bob", BALANCE, "11"); |
| tx2.set("jill", BALANCE, "61"); |
| |
| // tx1 should be rolled back even in case where columns tx1 locked are not read by tx2 |
| try { |
| tx2.commit(); |
| break; |
| } catch (CommitException ce) { |
| // do nothing |
| } |
| } |
| |
| TestTransaction tx4 = new TestTransaction(env); |
| |
| Assert.assertEquals("11", tx4.gets("bob", BALANCE)); |
| Assert.assertNull(tx4.gets("joe", BALANCE)); |
| Assert.assertEquals("61", tx4.gets("jill", BALANCE)); |
| } |
| |
| @Test |
| public void testRollbackSelf() throws Exception { |
| // test for #660... ensure when transaction rolls itself back that it properly sets the primary |
| // flag |
| |
| TestTransaction tx1 = new TestTransaction(env); |
| |
| tx1.set("bob", BALANCE, "10"); |
| tx1.set("joe", BALANCE, "20"); |
| tx1.set("jill", BALANCE, "60"); |
| |
| tx1.done(); |
| |
| final TestTransaction tx2 = new TestTransaction(env, "jill", BALANCE, 1); |
| |
| TestTransaction tx3 = new TestTransaction(env); |
| TestUtil.increment(tx3, "bob", BALANCE, 5); |
| TestUtil.increment(tx3, "joe", BALANCE, -5); |
| tx3.done(); |
| |
| TestUtil.increment(tx2, "bob", BALANCE, 5); |
| TestUtil.increment(tx2, "jill", BALANCE, -5); |
| |
| // should be able to successfully lock the primary column jill... but then should fail to lock |
| // bob and have to rollback |
| try { |
| tx2.commit(); |
| Assert.fail("Expected commit exception"); |
| } catch (CommitException ce) { |
| // do nothing |
| } |
| |
| boolean sawExpected = wasRolledBackPrimary(tx2.getStartTimestamp(), "jill"); |
| |
| Assert.assertTrue(sawExpected); |
| |
| TestTransaction tx4 = new TestTransaction(env); |
| Assert.assertEquals("15", tx4.gets("bob", BALANCE)); |
| Assert.assertEquals("15", tx4.gets("joe", BALANCE)); |
| Assert.assertEquals("60", tx4.gets("jill", BALANCE)); |
| tx4.close(); |
| |
| } |
| |
| private boolean wasRolledBackPrimary(long startTs, String rolledBackRow) |
| throws TableNotFoundException { |
| boolean sawExpected = false; |
| Scanner scanner = aClient.createScanner(getCurTableName(), Authorizations.EMPTY); |
| |
| for (Entry<Key, Value> entry : scanner) { |
| ColumnType colType = ColumnType.from(entry.getKey()); |
| long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; |
| String row = entry.getKey().getRowData().toString(); |
| byte[] val = entry.getValue().get(); |
| |
| if (row.equals(rolledBackRow) && colType == ColumnType.DEL_LOCK && ts == startTs |
| && DelLockValue.isPrimary(val)) { |
| sawExpected = true; |
| } |
| } |
| return sawExpected; |
| } |
| |
| /* |
| * There was a bug where a locked column with an empty family could not be recovered. |
| */ |
| @Test |
| public void testRecoverEmptyColumn() { |
| Column ecol = new Column("", "bal"); |
| |
| TestTransaction tx1 = new TestTransaction(env); |
| |
| tx1.set("bob", ecol, "10"); |
| tx1.set("joe", ecol, "20"); |
| tx1.set("jill", ecol, "60"); |
| |
| tx1.done(); |
| |
| // partially commit a transaction, leaving the row 'joe' with a lock. |
| TestTransaction tx2 = new TestTransaction(env); |
| TestUtil.increment(tx2, "bob", ecol, 5); |
| TestUtil.increment(tx2, "joe", ecol, -5); |
| |
| CommitData cd = tx2.createCommitData(); |
| RowColumn primary = new RowColumn("bob", ecol); |
| Assert.assertTrue(tx2.preCommit(cd, primary)); |
| Stamp commitTs = env.getSharedResources().getOracleClient().getStamp(); |
| tx2.commitPrimaryColumn(cd, commitTs); |
| tx2.close(); |
| |
| // this transaction should roll forward the above transaction |
| TestTransaction tx3 = new TestTransaction(env); |
| Assert.assertEquals("15", tx3.gets("bob", ecol)); |
| Assert.assertEquals("15", tx3.gets("joe", ecol)); |
| Assert.assertEquals("60", tx3.gets("jill", ecol)); |
| tx3.close(); |
| |
| |
| } |
| } |