fixes #978 Made async commit comprehensible (#1001)
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 2d9e2cc..ef13bc8 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -27,21 +27,20 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Result;
import org.apache.accumulo.core.client.ConditionalWriter.Status;
import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.ColumnUpdate;
@@ -105,6 +104,9 @@
private static final Bytes RLOCK_VAL =
Bytes.of("special rlock value 94da84e7796ff3b23b779805d820a33f1997cb8b");
+ // added to avoid findbugs false positive
+ private static final Supplier<Void> NULLS = () -> null;
+
private static boolean isWrite(Bytes val) {
return val != NTFY_VAL && val != RLOCK_VAL;
}
@@ -117,7 +119,7 @@
return val == RLOCK_VAL;
}
- private static enum TxStatus {
+ private enum TxStatus {
OPEN, COMMIT_STARTED, COMMITTED, CLOSED
}
@@ -136,10 +138,6 @@
private TxStatus status = TxStatus.OPEN;
private boolean commitAttempted = false;
- // for testing
- private boolean stopAfterPreCommit = false;
- private boolean stopAfterPrimaryCommit = false;
-
public TransactionImpl(Environment env, Notification trigger, long startTs) {
Objects.requireNonNull(env, "environment cannot be null");
Preconditions.checkArgument(startTs >= 0, "startTs cannot be negative");
@@ -522,12 +520,13 @@
checkIfOpen();
status = TxStatus.COMMIT_STARTED;
commitAttempted = true;
-
- stopAfterPreCommit = true;
}
SyncCommitObserver sco = new SyncCommitObserver();
- beginCommitAsync(cd, sco, primary);
+ cd = setUpBeginCommitAsync(cd, sco, primary);
+ if (cd != null) {
+ beginCommitAsyncTest(cd);
+ }
try {
sco.waitForCommit();
} catch (AlreadyAcknowledgedException e) {
@@ -554,7 +553,7 @@
*
* @param cd Commit data
*/
- private void readUnread(CommitData cd, Consumer<Entry<Key, Value>> locksSeen) throws Exception {
+ private void readUnread(CommitData cd, Consumer<Entry<Key, Value>> locksSeen) {
// TODO make async
// TODO need to keep track of ranges read (not ranges passed in, but actual data read... user
// may not iterate over entire range
@@ -619,14 +618,13 @@
if (rowColsToCheck.size() > 0) {
- long startTime = System.currentTimeMillis();
long waitTime = SnapshotScanner.INITIAL_WAIT_TIME;
boolean resolved = false;
List<Entry<Key, Value>> openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck);
- startTime = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis();
while (!resolved) {
resolved = LockResolver.resolveLocks(env, startTs, stats, openReadLocks, startTime);
@@ -701,23 +699,6 @@
return false;
}
- @VisibleForTesting
- public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) {
- stopAfterPrimaryCommit = true;
-
- SyncCommitObserver sco = new SyncCommitObserver();
- cd.commitObserver = sco;
- try {
- beginSecondCommitPhase(cd, commitStamp);
- sco.waitForCommit();
- } catch (CommitException e) {
- return false;
- } catch (Exception e) {
- throw new FluoException(e);
- }
- return true;
- }
-
public CommitData createCommitData() {
CommitData cd = new CommitData();
cd.cw = env.getSharedResources().getConditionalWriter();
@@ -728,9 +709,8 @@
@Override
public synchronized void commit() throws CommitException {
- SyncCommitObserver sco = null;
try {
- sco = new SyncCommitObserver();
+ SyncCommitObserver sco = new SyncCommitObserver();
commitAsync(sco);
sco.waitForCommit();
} finally {
@@ -807,7 +787,7 @@
// CHECKSTYLE:OFF
@Override
- protected void finalize() throws Throwable {
+ protected void finalize() {
// CHECKSTYLE:ON
// TODO Log an error if transaction is not closed (See FLUO-486)
close(false);
@@ -818,34 +798,6 @@
return startTs;
}
- /**
- * Funcitonal interface to provide next step of asynchronous commit on successful completion of
- * the previous one
- */
- private static interface OnSuccessInterface<V> {
- public void onSuccess(V result) throws Exception;
- }
-
- private abstract static class SynchronousCommitTask implements Runnable {
-
- private CommitData cd;
-
- SynchronousCommitTask(CommitData cd) {
- this.cd = cd;
- }
-
- protected abstract void runCommitStep(CommitData cd) throws Exception;
-
- @Override
- public void run() {
- try {
- runCommitStep(cd);
- } catch (Exception e) {
- cd.commitObserver.failed(e);
- }
- }
- }
-
@Override
public int getSize() {
// TODO could calculate as items are added/set
@@ -874,25 +826,6 @@
return size;
}
- private <V> void addCallback(CompletableFuture<V> cfuture, CommitData cd,
- OnSuccessInterface<V> onSuccessInterface) {
- cfuture.handleAsync((result, exception) -> {
- if (exception != null) {
- cd.commitObserver.failed(exception);
- return null;
- } else {
- try {
- onSuccessInterface.onSuccess(result);
- return null;
- } catch (Exception e) {
- cd.commitObserver.failed(e);
- return null;
- }
- }
- }, env.getSharedResources().getAsyncCommitExecutor());
- }
-
- // TODO exception handling!!!! How?????
abstract class CommitStep {
private CommitStep nextStep;
@@ -927,13 +860,13 @@
abstract class ConditionalStep extends CommitStep {
- CommitData cd;
-
public abstract Collection<ConditionalMutation> createMutations(CommitData cd);
- public abstract Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results);
+ public abstract Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
+ throws Exception;
- public abstract boolean processResults(CommitData cd, Iterator<Result> results);
+ public abstract boolean processResults(CommitData cd, Iterator<Result> results)
+ throws Exception;
public AsyncConditionalWriter getACW(CommitData cd) {
return cd.acw;
@@ -942,7 +875,6 @@
@Override
CompletableFuture<Boolean> getMainOp(CommitData cd) {
// TODO not sure threading is correct
- // TODO handle unknown
Executor ace = env.getSharedResources().getAsyncCommitExecutor();
return getACW(cd).apply(createMutations(cd)).thenCompose(results -> {
// ugh icky that this is an iterator, forces copy to inspect.. could refactor async CW to
@@ -951,17 +883,32 @@
Iterators.addAll(resultsList, results);
boolean containsUknown = false;
for (Result result : resultsList) {
- containsUknown |= result.getStatus() == Status.UNKNOWN;
+ try {
+ containsUknown |= result.getStatus() == Status.UNKNOWN;
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
}
-
if (containsUknown) {
// process unknown in sync executor
Executor se = env.getSharedResources().getSyncCommitExecutor();
- return CompletableFuture.supplyAsync(() -> handleUnknown(cd, resultsList.iterator()), se);
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ return handleUnknown(cd, resultsList.iterator());
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }, se);
} else {
return CompletableFuture.completedFuture(resultsList.iterator());
}
- }).thenApplyAsync(results -> processResults(cd, results), ace);
+ }).thenApplyAsync(results -> {
+ try {
+ return processResults(cd, results);
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }, ace);
}
@@ -976,7 +923,8 @@
}
@Override
- public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results) {
+ public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
+ throws Exception {
Result result = Iterators.getOnlyElement(results);
Status mutationStatus = result.getStatus();
@@ -986,14 +934,20 @@
switch (txInfo.status) {
case LOCKED:
- return Collections.singleton(new Result(Status.ACCEPTED, result.getMutation(), result.getTabletServer())).iterator();
+ return Collections
+ .singleton(
+ new Result(Status.ACCEPTED, result.getMutation(), result.getTabletServer()))
+ .iterator();
case ROLLED_BACK:
- return Collections.singleton(new Result(Status.REJECTED, result.getMutation(), result.getTabletServer())).iterator();
+ return Collections
+ .singleton(
+ new Result(Status.REJECTED, result.getMutation(), result.getTabletServer()))
+ .iterator();
case UNKNOWN:
// TODO async
Result newResult = cd.cw.write(result.getMutation());
mutationStatus = newResult.getStatus();
- if(mutationStatus != Status.UNKNOWN) {
+ if (mutationStatus != Status.UNKNOWN) {
return Collections.singleton(newResult).iterator();
}
// TODO handle case were data other tx has lock
@@ -1006,26 +960,30 @@
}
}
- //TODO
+ // TODO
throw new IllegalStateException();
}
@Override
- public boolean processResults(CommitData cd, Iterator<Result> results) {
+ public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception {
Result result = Iterators.getOnlyElement(results);
return result.getStatus() == Status.ACCEPTED;
}
@Override
CompletableFuture<Void> getFailureOp(CommitData cd) {
- //TODO can this be simplified by pushing some code to the superclass?
+ // TODO can this be simplified by pushing some code to the superclass?
return CompletableFuture.supplyAsync(() -> {
- ConditionalMutation pcm = Iterables.getOnlyElement(createMutations(cd));
+ final ConditionalMutation pcm = Iterables.getOnlyElement(createMutations(cd));
cd.addPrimaryToRejected();
getStats().setRejected(cd.getRejected());
// TODO do async
- checkForOrphanedLocks(cd);
+ try {
+ checkForOrphanedLocks(cd);
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
if (checkForAckCollision(pcm)) {
cd.commitObserver.alreadyAcknowledged();
} else {
@@ -1041,6 +999,12 @@
class LockOtherStep extends ConditionalStep {
@Override
+ public AsyncConditionalWriter getACW(CommitData cd) {
+ return cd.bacw;
+ }
+
+
+ @Override
public Collection<ConditionalMutation> createMutations(CommitData cd) {
ArrayList<ConditionalMutation> mutations = new ArrayList<>();
@@ -1072,7 +1036,7 @@
}
@Override
- public boolean processResults(CommitData cd, Iterator<Result> results) {
+ public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception {
while (results.hasNext()) {
Result result = results.next();
@@ -1092,13 +1056,15 @@
CompletableFuture<Void> getFailureOp(CommitData cd) {
return CompletableFuture.supplyAsync(() -> {
getStats().setRejected(cd.getRejected());
- checkForOrphanedLocks(cd);
+ try {
+ // Does this need to be async?
+ checkForOrphanedLocks(cd);
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
return null;
}, env.getSharedResources().getSyncCommitExecutor()).thenCompose(v -> rollbackLocks(cd));
-
-
}
-
}
abstract class BatchWriterStep extends CommitStep {
@@ -1119,23 +1085,130 @@
private CompletableFuture<Void> rollbackLocks(CommitData cd) {
- // TODO
- return null;
+ CommitStep firstStep = new RollbackOtherLocks();
+ firstStep.andThen(new RollbackPrimaryLock());
+
+ return firstStep.compose(cd)
+ .thenRun(() -> cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
+
+ }
+
+
+ class RollbackOtherLocks extends BatchWriterStep {
+
+ @Override
+ public Collection<Mutation> createMutations(CommitData cd) {
+ // roll back locks
+
+ // TODO let rollback be done lazily? this makes GC more difficult
+
+ Flutation m;
+
+ ArrayList<Mutation> mutations = new ArrayList<>(cd.acceptedRows.size());
+ for (Bytes row : cd.acceptedRows) {
+ m = new Flutation(env, row);
+ for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
+ if (isReadLock(entry.getValue())) {
+ m.put(entry.getKey(),
+ ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
+ DelReadLockValue.encodeRollback());
+ } else {
+ m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
+ DelLockValue.encodeRollback(false, true));
+ }
+ }
+ mutations.add(m);
+ }
+
+ return mutations;
+ }
+ }
+
+ class RollbackPrimaryLock extends BatchWriterStep {
+
+ @Override
+ public Collection<Mutation> createMutations(CommitData cd) {
+ // mark transaction as complete for garbage collection purposes
+ Flutation m = new Flutation(env, cd.prow);
+
+ m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs,
+ DelLockValue.encodeRollback(startTs, true, true));
+ m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
+
+ return Collections.singletonList(m);
+ }
+ }
+
+ class CommittedTestStep extends CommitStep {
+ CompletableFuture<Boolean> getMainOp(CommitData cd) {
+ cd.commitObserver.committed();
+ return CompletableFuture.completedFuture(true);
+ }
+
+ CompletableFuture<Void> getFailureOp(CommitData cd) {
+ throw new IllegalStateException("Failure not expected");
+ }
+ }
+
+ @VisibleForTesting
+ public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) {
+
+ SyncCommitObserver sco = new SyncCommitObserver();
+ cd.commitObserver = sco;
+ try {
+ CommitStep firstStep = new GetCommitStampStepTest(commitStamp);
+
+ firstStep.andThen(new WriteNotificationsStep()).andThen(new CommitPrimaryStep())
+ .andThen(new CommittedTestStep());
+
+ firstStep.compose(cd).exceptionally(throwable -> {
+ cd.commitObserver.failed(throwable);
+ return null;
+ });
+ sco.waitForCommit();
+ } catch (CommitException e) {
+ return false;
+ } catch (Exception e) {
+ throw new FluoException(e);
+ }
+ return true;
}
class GetCommitStampStep extends CommitStep {
+ protected CompletableFuture<Stamp> getStampOp() {
+ return env.getSharedResources().getOracleClient().getStampAsync();
+ }
@Override
CompletableFuture<Boolean> getMainOp(CommitData cd) {
- // TODO Auto-generated method stub
- // TODO set commitTs on commit data
- return null;
+
+ return getStampOp().thenApply(commitStamp -> {
+ if (startTs < commitStamp.getGcTimestamp()) {
+ return false;
+ } else {
+ getStats().setCommitTs(commitStamp.getTxTimestamp());
+ return true;
+ }
+ });
}
@Override
CompletableFuture<Void> getFailureOp(CommitData cd) {
- // TODO Auto-generated method stub
- return null;
+ return rollbackLocks(cd);
+ }
+
+ }
+
+ class GetCommitStampStepTest extends GetCommitStampStep {
+ private final Stamp testStamp;
+
+ public GetCommitStampStepTest(Stamp testStamp) {
+ this.testStamp = testStamp;
+ }
+
+ @Override
+ protected CompletableFuture<Stamp> getStampOp() {
+ return CompletableFuture.completedFuture(testStamp);
}
}
@@ -1144,8 +1217,42 @@
@Override
public Collection<Mutation> createMutations(CommitData cd) {
+ long commitTs = getStats().getCommitTs();
HashMap<Bytes, Mutation> mutations = new HashMap<>();
- // TODO copy code from writeNotificationsAsync()
+
+ if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) {
+ Flutation m = new Flutation(env, cd.prow);
+ Notification.put(env, m, cd.pcol, commitTs);
+ mutations.put(cd.prow, m);
+ }
+
+ for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
+
+ for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
+ if (observedColumns.contains(colUpdates.getKey())) {
+ Bytes val = colUpdates.getValue();
+ if (isWrite(val) && !isDelete(val)) {
+ Mutation m = mutations.get(rowUpdates.getKey());
+ if (m == null) {
+ m = new Flutation(env, rowUpdates.getKey());
+ mutations.put(rowUpdates.getKey(), m);
+ }
+ Notification.put(env, m, colUpdates.getKey(), commitTs);
+ }
+ }
+ }
+ }
+
+ for (Entry<Bytes, Set<Column>> entry : weakNotifications.entrySet()) {
+ Mutation m = mutations.get(entry.getKey());
+ if (m == null) {
+ m = new Flutation(env, entry.getKey());
+ mutations.put(entry.getKey(), m);
+ }
+ for (Column col : entry.getValue()) {
+ Notification.put(env, m, col, commitTs);
+ }
+ }
return mutations.values();
}
@@ -1155,36 +1262,107 @@
@Override
public Collection<ConditionalMutation> createMutations(CommitData cd) {
- // TODO Auto-generated method stub
- return null;
+ long commitTs = getStats().getCommitTs();
+ IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
+ PrewriteIterator.setSnaptime(iterConf, startTs);
+ boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn());
+
+ Condition lockCheck =
+ new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(LockValue.encode(cd.prow,
+ cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger, getTransactorID()));
+ final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow, lockCheck);
+
+ ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
+ isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
+
+ return Collections.singletonList(delLockMutation);
}
@Override
- public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results) {
- // TODO Auto-generated method stub
- return null;
+ public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
+ throws Exception {
+ // the code for handing this is synchronous and needs to be handled in another thread pool
+ // TODO - how do we do the above without return a CF?
+ long commitTs = getStats().getCommitTs();
+ Result result = Iterators.getOnlyElement(results);
+ Status ms = result.getStatus();
+
+ while (ms == Status.UNKNOWN) {
+
+ // TODO async
+ TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
+
+ switch (txInfo.status) {
+ case COMMITTED:
+ if (txInfo.commitTs != commitTs) {
+ throw new IllegalStateException(
+ cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs);
+ }
+ ms = Status.ACCEPTED;
+ break;
+ case LOCKED:
+ // TODO async
+ ConditionalMutation delLockMutation = result.getMutation();
+ ms = cd.cw.write(delLockMutation).getStatus();
+ break;
+ default:
+ ms = Status.REJECTED;
+ }
+ }
+ Result newResult = new Result(ms, result.getMutation(), result.getTabletServer());
+ return Collections.singletonList(newResult).iterator();
}
@Override
- public boolean processResults(CommitData cd, Iterator<Result> results) {
- // TODO Auto-generated method stub
- return false;
+ public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception {
+ Result result = Iterators.getOnlyElement(results);
+ return result.getStatus() == Status.ACCEPTED;
}
@Override
CompletableFuture<Void> getFailureOp(CommitData cd) {
- // TODO Auto-generated method stub
- return null;
+ cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
+ return CompletableFuture.completedFuture(NULLS.get());
}
}
+ @VisibleForTesting
+ public boolean finishCommit(CommitData cd, Stamp commitStamp) {
+ getStats().setCommitTs(commitStamp.getTxTimestamp());
+
+ CommitStep firstStep = new DeleteLocksStep();
+ firstStep.andThen(new FinishCommitStep());
+ firstStep.compose(cd).exceptionally(throwable -> {
+ System.err.println("Unexpected exception in finish commit test method : ");
+ throwable.printStackTrace();
+ return null;
+ });
+
+ return true;
+ }
+
+
class DeleteLocksStep extends BatchWriterStep {
@Override
public Collection<Mutation> createMutations(CommitData cd) {
- // TODO Auto-generated method stub
- return null;
+ long commitTs = getStats().getCommitTs();
+ ArrayList<Mutation> mutations = new ArrayList<>(updates.size() + 1);
+ for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
+ Flutation m = new Flutation(env, rowUpdates.getKey());
+ boolean isTriggerRow = isTriggerRow(rowUpdates.getKey());
+ for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
+ ColumnUtil.commitColumn(env,
+ isTriggerRow && colUpdates.getKey().equals(notification.getColumn()), false,
+ colUpdates.getKey(), isWrite(colUpdates.getValue()), isDelete(colUpdates.getValue()),
+ isReadLock(colUpdates.getValue()), startTs, commitTs, observedColumns, m);
+ }
+
+ mutations.add(m);
+ }
+
+ return mutations;
}
}
@@ -1192,9 +1370,33 @@
class FinishCommitStep extends BatchWriterStep {
@Override
+ CompletableFuture<Boolean> getMainOp(CommitData cd) {
+ return super.getMainOp(cd).thenApply(b -> {
+ Preconditions.checkArgument(b);
+ cd.commitObserver.committed();
+ return true;
+ });
+ }
+
+ @Override
public Collection<Mutation> createMutations(CommitData cd) {
- // TODO Auto-generated method stub
- return null;
+ long commitTs = getStats().getCommitTs();
+ ArrayList<Mutation> afterFlushMutations = new ArrayList<>(2);
+
+ Flutation m = new Flutation(env, cd.prow);
+ // mark transaction as complete for garbage collection purposes
+ m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | commitTs, EMPTY);
+ afterFlushMutations.add(m);
+
+ if (weakNotification != null) {
+ afterFlushMutations.add(weakNotification.newDelete(env, startTs));
+ }
+
+ if (notification != null) {
+ afterFlushMutations.add(notification.newDelete(env, startTs));
+ }
+
+ return afterFlushMutations;
}
}
@@ -1208,21 +1410,23 @@
try {
CommitData cd = createCommitData();
- beginCommitAsync(cd, commitCallback, null);
+ cd = setUpBeginCommitAsync(cd, commitCallback, null);
+ if (cd != null) {
+ beginCommitAsync(cd);
+ }
} catch (Exception e) {
e.printStackTrace();
commitCallback.failed(e);
}
}
- private void beginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback,
+ private CommitData setUpBeginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback,
RowColumn primary) {
-
if (updates.size() == 0) {
// TODO do async
deleteWeakRow();
commitCallback.committed();
- return;
+ return null;
}
for (Map<Column, Bytes> cols : updates.values()) {
@@ -1257,7 +1461,7 @@
// there are only read locks, so nothing to write
deleteWeakRow();
commitCallback.committed();
- return;
+ return null;
}
}
@@ -1272,360 +1476,48 @@
cd.commitObserver = commitCallback;
+ return cd;
+ }
+
+ private void beginCommitAsync(CommitData cd) {
+
+ // Notification are written between GetCommitStampStep and CommitPrimaryStep for the following
+ // reasons :
+ // * At this point all columns are locked, this guarantees that anything triggering as a
+ // result of this transaction will see all of this transactions changes.
+ // * The transaction is not yet committed. If the process dies at this point whatever
+ // was running this transaction should rerun and recreate all of the notifications.
+ // The next transactions will rerun because this transaction will have to be rolled back.
+ // * If notifications are written in the 2nd phase of commit, then when the 2nd phase
+ // partially succeeds notifications may never be written. Because in the case of failure
+ // notifications would not be written until a column is read and it may never be read.
+ // See https://github.com/fluo-io/fluo/issues/642
+ //
+ // Its very important the notifications which trigger an observer are deleted after the 2nd
+ // phase of commit finishes.
+
CommitStep firstStep = new LockPrimaryStep();
- firstStep.andThen(new LockOtherStep())
- .andThen(new GetCommitStampStep())
- .andThen(new WriteNotificationsStep())
- .andThen(new CommitPrimaryStep())
- .andThen(new DeleteLocksStep())
- .andThen(new FinishCommitStep());
+ firstStep.andThen(new LockOtherStep()).andThen(new GetCommitStampStep())
+ .andThen(new WriteNotificationsStep()).andThen(new CommitPrimaryStep())
+ .andThen(new DeleteLocksStep()).andThen(new FinishCommitStep());
- firstStep.compose(cd);
+ firstStep.compose(cd).exceptionally(throwable -> {
+ cd.commitObserver.failed(throwable);
+ return null;
+ });
}
- private void postLockPrimary(final CommitData cd, final ConditionalMutation pcm, Result result)
- throws Exception {
- final Status mutationStatus = result.getStatus();
+ private void beginCommitAsyncTest(CommitData cd) {
- if (mutationStatus == Status.ACCEPTED) {
- lockOtherColumns(cd);
- } else {
- env.getSharedResources().getSyncCommitExecutor().execute(new SynchronousCommitTask(cd) {
- @Override
- protected void runCommitStep(CommitData cd) throws Exception {
- synchronousPostLockPrimary(cd, pcm, mutationStatus);
- }
- });
- }
- }
+ CommitStep firstStep = new LockPrimaryStep();
- private void synchronousPostLockPrimary(CommitData cd, ConditionalMutation pcm,
- Status mutationStatus) throws AccumuloException, AccumuloSecurityException, Exception {
- // TODO convert this code to async
- while (mutationStatus == Status.UNKNOWN) {
- TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
+ firstStep.andThen(new LockOtherStep()).andThen(new CommittedTestStep());
- switch (txInfo.status) {
- case LOCKED:
- mutationStatus = Status.ACCEPTED;
- break;
- case ROLLED_BACK:
- mutationStatus = Status.REJECTED;
- break;
- case UNKNOWN:
- // TODO async
- mutationStatus = cd.cw.write(pcm).getStatus();
- // TODO handle case were data other tx has lock
- break;
- case COMMITTED:
- default:
- throw new IllegalStateException(
- "unexpected tx state " + txInfo.status + " " + cd.prow + " " + cd.pcol);
-
- }
- }
-
- if (mutationStatus != Status.ACCEPTED) {
- cd.addPrimaryToRejected();
- getStats().setRejected(cd.getRejected());
- // TODO do async
- checkForOrphanedLocks(cd);
- if (checkForAckCollision(pcm)) {
- cd.commitObserver.alreadyAcknowledged();
- } else {
- cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
- }
- return;
- }
-
- lockOtherColumns(cd);
- }
-
- private void lockOtherColumns(CommitData cd) {
- ArrayList<ConditionalMutation> mutations = new ArrayList<>();
-
- for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
- ConditionalFlutation cm = null;
-
- for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
- if (cm == null) {
- cm = prewrite(rowUpdates.getKey(), colUpdates.getKey(), colUpdates.getValue(), cd.prow,
- cd.pcol, false);
- } else {
- prewrite(cm, colUpdates.getKey(), colUpdates.getValue(), cd.prow, cd.pcol, false);
- }
- }
-
- mutations.add(cm);
- }
-
- cd.acceptedRows = new HashSet<>();
-
- CompletableFuture<Iterator<Result>> cfuture = cd.bacw.apply(mutations);
- addCallback(cfuture, cd, results -> postLockOther(cd, results));
- }
-
- private void postLockOther(final CommitData cd, Iterator<Result> results) throws Exception {
- while (results.hasNext()) {
- Result result = results.next();
- // TODO handle unknown?
- Bytes row = Bytes.of(result.getMutation().getRow());
- if (result.getStatus() == Status.ACCEPTED) {
- cd.acceptedRows.add(row);
- } else {
- cd.addToRejected(row, updates.get(row).keySet());
- }
- }
-
- if (cd.getRejected().size() > 0) {
- getStats().setRejected(cd.getRejected());
- env.getSharedResources().getSyncCommitExecutor().execute(new SynchronousCommitTask(cd) {
- @Override
- protected void runCommitStep(CommitData cd) throws Exception {
- checkForOrphanedLocks(cd);
- rollbackOtherLocks(cd);
- }
- });
- } else if (stopAfterPreCommit) {
- cd.commitObserver.committed();
- } else {
- CompletableFuture<Stamp> cfuture = env.getSharedResources().getOracleClient().getStampAsync();
- addCallback(cfuture, cd, stamp -> beginSecondCommitPhase(cd, stamp));
- }
- }
-
- private void rollbackOtherLocks(CommitData cd) throws Exception {
- // roll back locks
-
- // TODO let rollback be done lazily? this makes GC more difficult
-
- Flutation m;
-
- ArrayList<Mutation> mutations = new ArrayList<>(cd.acceptedRows.size());
- for (Bytes row : cd.acceptedRows) {
- m = new Flutation(env, row);
- for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
- if (isReadLock(entry.getValue())) {
- m.put(entry.getKey(), ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
- DelReadLockValue.encodeRollback());
- } else {
- m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
- DelLockValue.encodeRollback(false, true));
- }
- }
- mutations.add(m);
- }
-
- CompletableFuture<Void> cfuture =
- env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
- addCallback(cfuture, cd, result -> rollbackPrimaryLock(cd));
- }
-
- private void rollbackPrimaryLock(CommitData cd) throws Exception {
-
- // mark transaction as complete for garbage collection purposes
- Flutation m = new Flutation(env, cd.prow);
-
- m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs,
- DelLockValue.encodeRollback(startTs, true, true));
- m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
-
- CompletableFuture<Void> cfuture =
- env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(m);
- addCallback(cfuture, cd,
- result -> cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
- }
-
- private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp) throws Exception {
- if (startTs < commitStamp.getGcTimestamp()) {
- rollbackOtherLocks(cd);
- } else {
- // Notification are written here for the following reasons :
- // * At this point all columns are locked, this guarantees that anything triggering as a
- // result of this transaction will see all of this transactions changes.
- // * The transaction is not yet committed. If the process dies at this point whatever
- // was running this transaction should rerun and recreate all of the notifications.
- // The next transactions will rerun because this transaction will have to be rolled back.
- // * If notifications are written in the 2nd phase of commit, then when the 2nd phase
- // partially succeeds notifications may never be written. Because in the case of failure
- // notifications would not be written until a column is read and it may never be read.
- // See https://github.com/fluo-io/fluo/issues/642
- //
- // Its very important the notifications which trigger an observer are deleted after the 2nd
- // phase of commit finishes.
- getStats().setCommitTs(commitStamp.getTxTimestamp());
- writeNotificationsAsync(cd, commitStamp.getTxTimestamp());
- }
- }
-
- private void writeNotificationsAsync(CommitData cd, final long commitTs) {
-
- HashMap<Bytes, Mutation> mutations = new HashMap<>();
-
- if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) {
- Flutation m = new Flutation(env, cd.prow);
- Notification.put(env, m, cd.pcol, commitTs);
- mutations.put(cd.prow, m);
- }
-
- for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
-
- for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
- if (observedColumns.contains(colUpdates.getKey())) {
- Bytes val = colUpdates.getValue();
- if (isWrite(val) && !isDelete(val)) {
- Mutation m = mutations.get(rowUpdates.getKey());
- if (m == null) {
- m = new Flutation(env, rowUpdates.getKey());
- mutations.put(rowUpdates.getKey(), m);
- }
- Notification.put(env, m, colUpdates.getKey(), commitTs);
- }
- }
- }
- }
-
- for (Entry<Bytes, Set<Column>> entry : weakNotifications.entrySet()) {
- Mutation m = mutations.get(entry.getKey());
- if (m == null) {
- m = new Flutation(env, entry.getKey());
- mutations.put(entry.getKey(), m);
- }
- for (Column col : entry.getValue()) {
- Notification.put(env, m, col, commitTs);
- }
- }
-
- CompletableFuture<Void> cfuture =
- env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values());
- addCallback(cfuture, cd, result -> commmitPrimary(cd, commitTs));
- }
-
- private void commmitPrimary(CommitData cd, final long commitTs) {
- // try to delete lock and add write for primary column
- IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
- PrewriteIterator.setSnaptime(iterConf, startTs);
- boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn());
-
- Condition lockCheck =
- new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(LockValue.encode(cd.prow,
- cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger, getTransactorID()));
- final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow, lockCheck);
-
- ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
- isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
-
- CompletableFuture<Iterator<Result>> cfuture =
- cd.acw.apply(Collections.singletonList(delLockMutation));
- addCallback(cfuture, cd, result -> handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation,
- Iterators.getOnlyElement(result)));
- }
-
- private void handleUnkownStatsAfterPrimary(CommitData cd, final long commitTs,
- final ConditionalMutation delLockMutation, Result result) throws Exception {
-
- final Status mutationStatus = result.getStatus();
- if (mutationStatus == Status.UNKNOWN) {
- // the code for handing this is synchronous and needs to be handled in another thread pool
- Runnable task = new SynchronousCommitTask(cd) {
- @Override
- protected void runCommitStep(CommitData cd) throws Exception {
-
- Status ms = mutationStatus;
-
- while (ms == Status.UNKNOWN) {
-
- // TODO async
- TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
-
- switch (txInfo.status) {
- case COMMITTED:
- if (txInfo.commitTs != commitTs) {
- throw new IllegalStateException(
- cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs);
- }
- ms = Status.ACCEPTED;
- break;
- case LOCKED:
- // TODO async
- ms = cd.cw.write(delLockMutation).getStatus();
- break;
- default:
- ms = Status.REJECTED;
- }
- }
-
- postCommitPrimary(cd, commitTs, ms);
- }
- };
-
- env.getSharedResources().getSyncCommitExecutor().execute(task);
- } else {
- postCommitPrimary(cd, commitTs, mutationStatus);
- }
- }
-
- private void postCommitPrimary(CommitData cd, long commitTs, Status mutationStatus)
- throws Exception {
- if (mutationStatus != Status.ACCEPTED) {
- cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
- } else {
- if (stopAfterPrimaryCommit) {
- cd.commitObserver.committed();
- } else {
- deleteLocks(cd, commitTs);
- }
- }
- }
-
- private void deleteLocks(CommitData cd, final long commitTs) {
- // delete locks and add writes for other columns
- ArrayList<Mutation> mutations = new ArrayList<>(updates.size() + 1);
- for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
- Flutation m = new Flutation(env, rowUpdates.getKey());
- boolean isTriggerRow = isTriggerRow(rowUpdates.getKey());
- for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
- ColumnUtil.commitColumn(env,
- isTriggerRow && colUpdates.getKey().equals(notification.getColumn()), false,
- colUpdates.getKey(), isWrite(colUpdates.getValue()), isDelete(colUpdates.getValue()),
- isReadLock(colUpdates.getValue()), startTs, commitTs, observedColumns, m);
- }
-
- mutations.add(m);
- }
-
- CompletableFuture<Void> cfuture =
- env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
- addCallback(cfuture, cd, result -> finishCommit(cd, commitTs));
- }
-
- @VisibleForTesting
- public boolean finishCommit(CommitData cd, Stamp commitStamp)
- throws TableNotFoundException, MutationsRejectedException {
- deleteLocks(cd, commitStamp.getTxTimestamp());
- return true;
- }
-
- private void finishCommit(CommitData cd, long commitTs) {
- ArrayList<Mutation> afterFlushMutations = new ArrayList<>(2);
-
- Flutation m = new Flutation(env, cd.prow);
- // mark transaction as complete for garbage collection purposes
- m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | commitTs, EMPTY);
- afterFlushMutations.add(m);
-
- if (weakNotification != null) {
- afterFlushMutations.add(weakNotification.newDelete(env, startTs));
- }
-
- if (notification != null) {
- afterFlushMutations.add(notification.newDelete(env, startTs));
- }
-
- env.getSharedResources().getBatchWriter().writeMutationsAsync(afterFlushMutations);
-
- cd.commitObserver.committed();
+ firstStep.compose(cd).exceptionally(throwable -> {
+ cd.commitObserver.failed(throwable);
+ return null;
+ });
}
public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns) {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
index a175500..02ed35d 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
@@ -20,10 +20,7 @@
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
@@ -124,12 +121,11 @@
tx.close();
}
- public CommitData createCommitData() throws TableNotFoundException {
+ public CommitData createCommitData() {
return tx.createCommitData();
}
- public boolean preCommit(CommitData cd) throws AlreadyAcknowledgedException,
- TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ public boolean preCommit(CommitData cd) throws AlreadyAcknowledgedException {
return tx.preCommit(cd);
}
@@ -137,13 +133,11 @@
return tx.preCommit(cd, primary);
}
- public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp)
- throws AccumuloException, AccumuloSecurityException {
+ public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) {
return tx.commitPrimaryColumn(cd, commitStamp);
}
- public void finishCommit(CommitData cd, Stamp commitStamp)
- throws MutationsRejectedException, TableNotFoundException {
+ public void finishCommit(CommitData cd, Stamp commitStamp) {
tx.finishCommit(cd, commitStamp);
env.getSharedResources().getBatchWriter().waitForAsyncFlush();
}