blob: 9597662d9e57757a12ced258c81d3c3fc92258ed [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.fluo.core.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Result;
import org.apache.accumulo.core.client.ConditionalWriter.Status;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.fluo.accumulo.iterators.PrewriteIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
import org.apache.fluo.accumulo.values.DelReadLockValue;
import org.apache.fluo.accumulo.values.LockValue;
import org.apache.fluo.accumulo.values.ReadLockValue;
import org.apache.fluo.api.client.AbstractTransactionBase;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.exceptions.AlreadySetException;
import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.core.async.AsyncCommitObserver;
import org.apache.fluo.core.async.AsyncConditionalWriter;
import org.apache.fluo.core.async.AsyncTransaction;
import org.apache.fluo.core.async.SyncCommitObserver;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.exceptions.StaleScanException;
import org.apache.fluo.core.impl.AsyncReader;
import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl;
import org.apache.fluo.core.util.ByteUtil;
import org.apache.fluo.core.util.ColumnUtil;
import org.apache.fluo.core.util.ConditionalFlutation;
import org.apache.fluo.core.util.FluoCondition;
import org.apache.fluo.core.util.Flutation;
import org.apache.fluo.core.util.Hex;
import org.apache.fluo.core.util.SpanUtil;
import org.apache.fluo.core.util.UtilWaitThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static;
import static;
* Transaction implementation
public class TransactionImpl extends AbstractTransactionBase implements AsyncTransaction, Snapshot {
public static final byte[] EMPTY = new byte[0];
public static final Bytes EMPTY_BS = Bytes.of(EMPTY);
private static final Bytes DELETE =
Bytes.of("special delete object f804266bf94935edd45ae3e6c287b93c1814295c");
private static final Bytes NTFY_VAL =
Bytes.of("special ntfy value ce0c523e6e4dc093be8a2736b82eca1b95f97ed4");
private static final Bytes RLOCK_VAL =
Bytes.of("special rlock value 94da84e7796ff3b23b779805d820a33f1997cb8b");
// added to avoid findbugs false positive
private static final Supplier<Void> NULLS = () -> null;
private static boolean isWrite(Bytes val) {
return val != NTFY_VAL && val != RLOCK_VAL;
private static boolean isDelete(Bytes val) {
return val == DELETE;
private static boolean isReadLock(Bytes val) {
return val == RLOCK_VAL;
private enum TxStatus {
private final long startTs;
private final Map<Bytes, Map<Column, Bytes>> updates = new HashMap<>();
private final Map<Bytes, Set<Column>> weakNotifications = new HashMap<>();
private final Set<Column> observedColumns;
private final Environment env;
private final Map<Bytes, Set<Column>> columnsRead = new HashMap<>();
// Tracks row columns that were observed to have had a read lock in the past.
private final Map<Bytes, Set<Column>> readLocksSeen = new HashMap<>();
private final TxStats stats;
private Notification notification;
private Notification weakNotification;
private TransactorNode tnode = null;
private TxStatus status = TxStatus.OPEN;
private boolean commitAttempted = false;
private AsyncReader asyncReader = null;
public TransactionImpl(Environment env, Notification trigger, long startTs) {
Objects.requireNonNull(env, "environment cannot be null");
Preconditions.checkArgument(startTs >= 0, "startTs cannot be negative");
this.env = env;
this.stats = new TxStats(env);
this.startTs = startTs;
this.observedColumns = env.getConfiguredObservers().getObservedColumns(STRONG);
if (trigger != null
&& env.getConfiguredObservers().getObservedColumns(WEAK).contains(trigger.getColumn())) {
this.weakNotification = trigger;
} else {
this.notification = trigger;
if (notification != null) {
Map<Column, Bytes> colUpdates = new HashMap<>();
colUpdates.put(notification.getColumn(), NTFY_VAL);
updates.put(notification.getRow(), colUpdates);
public TransactionImpl(Environment env, Notification trigger) {
this(env, trigger, allocateTimestamp(env).getTxTimestamp());
public TransactionImpl(Environment env) {
this(env, null, allocateTimestamp(env).getTxTimestamp());
public TransactionImpl(Environment env, long startTs) {
this(env, null, startTs);
private static Stamp allocateTimestamp(Environment env) {
return env.getSharedResources().getTimestampTracker().allocateTimestamp();
public Bytes get(Bytes row, Column column) {
// TODO cache? precache?
return get(row, Collections.singleton(column)).get(column);
public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
return getImpl(row, columns);
public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
if (rows.isEmpty() || columns.isEmpty()) {
return Collections.emptyMap();
ParallelSnapshotScanner pss =
new ParallelSnapshotScanner(rows, columns, env, startTs, stats, readLocksSeen, kve -> {
Map<Bytes, Map<Column, Bytes>> ret = pss.scan();
for (Entry<Bytes, Map<Column, Bytes>> entry : ret.entrySet()) {
updateColumnsRead(entry.getKey(), entry.getValue().keySet());
return ret;
public Map<RowColumn, Bytes> get(Collection<RowColumn> rowColumns) {
return getImpl(rowColumns, kve -> {
private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns) {
// TODO push visibility filtering to server side?
boolean shouldCopy = false;
for (Column column : columns) {
if (column.isVisibilitySet()) {
shouldCopy = true;
SnapshotScanner.Opts opts;
if (shouldCopy) {
HashSet<Column> cols = new HashSet<>();
for (Column column : columns) {
if (column.isVisibilitySet()) {
cols.add(new Column(column.getFamily(), column.getQualifier()));
} else {
opts = new SnapshotScanner.Opts(Span.exact(row), columns, true);
} else {
opts = new SnapshotScanner.Opts(Span.exact(row), columns, true);
Map<Column, Bytes> ret = new HashMap<>();
Set<Column> readLockCols = null;
for (Entry<Key, Value> kve : new SnapshotScanner(env, opts, startTs, stats)) {
Column col = ColumnUtil.convert(kve.getKey());
if (shouldCopy && !columns.contains(col)) {
if (ColumnType.from(kve.getKey()) == ColumnType.RLOCK) {
if (readLockCols == null) {
readLockCols = readLocksSeen.computeIfAbsent(row, k -> new HashSet<>());
} else {
ret.put(col, Bytes.of(kve.getValue().get()));
// only update columns read after successful read
updateColumnsRead(row, columns);
return ret;
private Map<RowColumn, Bytes> getImpl(Collection<RowColumn> rowColumns,
Consumer<Entry<Key, Value>> writeLocksSeen) {
if (rowColumns.isEmpty()) {
return Collections.emptyMap();
ParallelSnapshotScanner pss =
new ParallelSnapshotScanner(rowColumns, env, startTs, stats, readLocksSeen, writeLocksSeen);
Map<Bytes, Map<Column, Bytes>> scan = pss.scan();
Map<RowColumn, Bytes> ret = new HashMap<>();
for (Entry<Bytes, Map<Column, Bytes>> entry : scan.entrySet()) {
updateColumnsRead(entry.getKey(), entry.getValue().keySet());
for (Entry<Column, Bytes> colVal : entry.getValue().entrySet()) {
ret.put(new RowColumn(entry.getKey(), colVal.getKey()), colVal.getValue());
return ret;
public CompletableFuture<Bytes> getAsync(Bytes row, Column column) {
return getAsyncReader().get(row, column);
public CompletableFuture<Bytes> getAsync(Bytes row, Column column, Bytes defaultValue) {
return getAsyncReader().get(row, column, defaultValue);
public CompletableFuture<String> getsAsync(String row, Column column) {
return getAsyncReader().gets(row, column);
public CompletableFuture<String> getsAsync(String row, Column column, String defaultValue) {
return getAsyncReader().gets(row, column, defaultValue);
private AsyncReader getAsyncReader() {
if (asyncReader == null) {
asyncReader = new AsyncReader(this);
return asyncReader;
public ScannerBuilder scanner() {
return new ScannerBuilderImpl(this);
private void updateColumnsRead(Bytes row, Set<Column> columns) {
Set<Column> colsRead = columnsRead.get(row);
if (colsRead == null) {
colsRead = new HashSet<>();
columnsRead.put(row, colsRead);
void setReadLock(Bytes row, Column col) {
if (col.getFamily().equals(ColumnConstants.NOTIFY_CF)) {
throw new IllegalArgumentException(ColumnConstants.NOTIFY_CF + " is a reserved family");
Map<Column, Bytes> colUpdates = updates.computeIfAbsent(row, k -> new HashMap<>());
Bytes curVal = colUpdates.get(col);
if (curVal != null && (isWrite(curVal) || isDelete(curVal))) {
throw new AlreadySetException("Attemped read lock after write lock " + row + " " + col);
colUpdates.put(col, RLOCK_VAL);
public SnapshotBase withReadLock() {
return new ReadLockSnapshot(this);
public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
if (col.getFamily().equals(ColumnConstants.NOTIFY_CF)) {
throw new IllegalArgumentException(ColumnConstants.NOTIFY_CF + " is a reserved family");
Map<Column, Bytes> colUpdates = updates.computeIfAbsent(row, k -> new HashMap<>());
Bytes curVal = colUpdates.get(col);
if (curVal != null && isWrite(curVal)) {
throw new AlreadySetException("Value already set " + row + " " + col);
colUpdates.put(col, value);
public void setWeakNotification(Bytes row, Column col) {
if (!env.getConfiguredObservers().getObservedColumns(WEAK).contains(col)) {
throw new IllegalArgumentException("Column not configured for weak notifications " + col);
Set<Column> columns = weakNotifications.get(row);
if (columns == null) {
columns = new HashSet<>();
weakNotifications.put(row, columns);
public void delete(Bytes row, Column col) throws AlreadySetException {
set(row, col, DELETE);
private ConditionalFlutation prewrite(ConditionalFlutation cm, Bytes row, Column col, Bytes val,
Bytes primaryRow, Column primaryColumn, boolean isTriggerRow) {
IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
PrewriteIterator.setSnaptime(iterConf, startTs);
boolean isTrigger = isTriggerRow && col.equals(notification.getColumn());
if (isTrigger) {
PrewriteIterator.enableAckCheck(iterConf, notification.getTimestamp());
if (isReadLock(val)) {
Condition cond = new FluoCondition(env, col).setIterators(iterConf);
if (cm == null) {
cm = new ConditionalFlutation(env, row, cond);
} else {
if (isWrite(val) && !isDelete(val)) {
cm.put(col, ColumnType.DATA.encode(startTs), val.toArray());
if (isReadLock(val)) {
cm.put(col, ColumnType.RLOCK.encode(ReadLockUtil.encodeTs(startTs, false)),
ReadLockValue.encode(primaryRow, primaryColumn, getTransactorID()));
} else {
cm.put(col, ColumnType.LOCK.encode(startTs), LockValue.encode(primaryRow, primaryColumn,
isWrite(val), isDelete(val), isTriggerRow, getTransactorID()));
return cm;
private ConditionalFlutation prewrite(Bytes row, Column col, Bytes val, Bytes primaryRow,
Column primaryColumn, boolean isTriggerRow) {
return prewrite(null, row, col, val, primaryRow, primaryColumn, isTriggerRow);
private void prewrite(ConditionalFlutation cm, Column col, Bytes val, Bytes primaryRow,
Column primaryColumn, boolean isTriggerRow) {
prewrite(cm, null, col, val, primaryRow, primaryColumn, isTriggerRow);
public static class CommitData {
ConditionalWriter cw;
private Bytes prow;
private Column pcol;
private Bytes pval;
private HashSet<Bytes> acceptedRows;
private Map<Bytes, Set<Column>> rejected = null;
private void addPrimaryToRejected() {
rejected = Collections.singletonMap(prow, Collections.singleton(pcol));
private void addToRejected(Bytes row, Set<Column> columns) {
if (rejected == null) {
rejected = new HashMap<>();
Set<Column> ret = rejected.put(row, columns);
if (ret != null) {
throw new IllegalStateException();
private Map<Bytes, Set<Column>> getRejected() {
if (rejected == null) {
return Collections.emptyMap();
return rejected;
public String toString() {
return prow + " " + pcol + " " + pval + " " + getRejected().size();
public String getShortCollisionMessage() {
StringBuilder sb = new StringBuilder();
if (!getRejected().isEmpty()) {
int numCollisions = 0;
for (Set<Column> cols : getRejected().values()) {
numCollisions += cols.size();
String sep = "";
outer: for (Entry<Bytes, Set<Column>> entry : getRejected().entrySet()) {
Bytes row = entry.getKey();
for (Column col : entry.getValue()) {
sep = ", ";
Hex.encNonAscii(sb, row);
sb.append(" ");
Hex.encNonAscii(sb, col, " ");
if (sb.length() > 100) {
sb.append(" ...");
break outer;
return sb.toString();
// async stuff
private AsyncConditionalWriter acw;
private AsyncConditionalWriter bacw;
private AsyncCommitObserver commitObserver;
private boolean isTriggerRow(Bytes row) {
return notification != null && notification.getRow().equals(row);
public boolean preCommit(CommitData cd) {
return preCommit(cd, null);
public boolean preCommit(CommitData cd, RowColumn primary) {
synchronized (this) {
status = TxStatus.COMMIT_STARTED;
commitAttempted = true;
SyncCommitObserver sco = new SyncCommitObserver();
cd = setUpBeginCommitAsync(cd, sco, primary);
if (cd != null) {
try {
} catch (AlreadyAcknowledgedException e) {
throw e;
} catch (CommitException e) {
return false;
return true;
* This function helps handle the following case
* <OL>
* <LI>TX1 locks r1 col1
* <LI>TX1 fails before unlocking
* <LI>TX2 attempts to write r1:col1 w/o reading it
* </OL>
* <p>
* In this case TX2 would not roll back TX1, because it never read the column. This function
* attempts to handle this case if TX2 fails. Only doing this in case of failures is cheaper than
* trying to always read unread columns.
* @param cd Commit data
private void readUnread(CommitData cd, Consumer<Entry<Key, Value>> writeLocksSeen) {
// TODO need to keep track of ranges read (not ranges passed in, but actual data read... user
// may not iterate over entire range
Collection<RowColumn> rowColumnsToRead = new ArrayList<>();
for (Entry<Bytes, Set<Column>> entry : cd.getRejected().entrySet()) {
Set<Column> rowColsRead = columnsRead.get(entry.getKey());
if (rowColsRead == null) {
for (Column column : entry.getValue()) {
rowColumnsToRead.add(new RowColumn(entry.getKey(), column));
} else {
HashSet<Column> colsToRead = new HashSet<>(entry.getValue());
if (!colsToRead.isEmpty()) {
for (Column column : colsToRead) {
rowColumnsToRead.add(new RowColumn(entry.getKey(), column));
getImpl(rowColumnsToRead, writeLocksSeen);
private void checkForOrphanedReadLocks(CommitData cd, Map<Bytes, Set<Column>> locksResolved)
throws Exception {
if (readLocksSeen.isEmpty()) {
Map<Bytes, Set<Column>> rowColsToCheck = new HashMap<>();
for (Entry<Bytes, Set<Column>> entry : cd.getRejected().entrySet()) {
Set<Column> resolvedColumns =
locksResolved.getOrDefault(entry.getKey(), Collections.emptySet());
Set<Column> colsToCheck = null;
Set<Column> readLockCols = readLocksSeen.get(entry.getKey());
if (readLockCols != null) {
for (Column candidate : Sets.intersection(readLockCols, entry.getValue())) {
if (resolvedColumns.contains(candidate)) {
// A write lock was seen and this is probably what caused the collision, no need to
// check this column for read locks.
if (!isReadLock(updates.getOrDefault(entry.getKey(), Collections.emptyMap())
.getOrDefault(candidate, EMPTY_BS))) {
if (colsToCheck == null) {
colsToCheck = new HashSet<>();
if (colsToCheck != null) {
rowColsToCheck.put(entry.getKey(), colsToCheck);
if (!rowColsToCheck.isEmpty()) {
long waitTime = SnapshotScanner.INITIAL_WAIT_TIME;
boolean resolved = false;
List<Entry<Key, Value>> openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck);
long startTime = System.currentTimeMillis();
while (!resolved) {
resolved = LockResolver.resolveLocks(env, startTs, stats, openReadLocks, startTime);
if (!resolved) {
waitTime = Math.min(SnapshotScanner.MAX_WAIT_TIME, waitTime * 2);
openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck);
private void checkForOrphanedLocks(CommitData cd) throws Exception {
Map<Bytes, Set<Column>> writeLocksSeen = new HashMap<>();
readUnread(cd, kve -> {
Bytes row = ByteUtil.toBytes(kve.getKey().getRowData());
Column col = ColumnUtil.convert(kve.getKey());
writeLocksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col);
checkForOrphanedReadLocks(cd, writeLocksSeen);
private boolean checkForAckCollision(ConditionalMutation cm) {
Bytes row = Bytes.of(cm.getRow());
if (isTriggerRow(row)) {
List<ColumnUpdate> updates = cm.getUpdates();
for (ColumnUpdate cu : updates) {
// TODO avoid create col vis object
Column col = new Column(Bytes.of(cu.getColumnFamily()), Bytes.of(cu.getColumnQualifier()),
if (notification.getColumn().equals(col)) {
// check to see if ACK exist after notification
Key startKey = SpanUtil.toKey(notification.getRowColumn());
Key endKey = SpanUtil.toKey(notification.getRowColumn());
endKey.setTimestamp(ColumnType.ACK.encode(notification.getTimestamp() + 1));
Range range = new Range(startKey, endKey);
try (Scanner scanner =
env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
// TODO could use iterator that stops after 1st ACK. thought of using versioning iter
// but
// it scans to ACK
if (scanner.iterator().hasNext()) {
return true;
} catch (TableNotFoundException e) {
// TODO proper exception handling
throw new RuntimeException(e);
return false;
public CommitData createCommitData() {
CommitData cd = new CommitData(); = env.getSharedResources().getConditionalWriter();
cd.acw = env.getSharedResources().getAsyncConditionalWriter();
cd.bacw = env.getSharedResources().getBulkAsyncConditionalWriter();
return cd;
public synchronized void commit() throws CommitException {
try {
SyncCommitObserver sco = new SyncCommitObserver();
} finally {
weakNotification = null;
void deleteWeakRow() {
if (weakNotification != null) {
.writeMutation(weakNotification.newDelete(env, startTs));
public TxStats getStats() {
return stats;
public long getStartTs() {
return startTs;
* Sets the transactor of this transaction
* @param tnode TransactorNode
* @return this Transaction
public TransactionImpl setTransactor(TransactorNode tnode) {
this.tnode = tnode;
return this;
* Retrieves transactor ID by first getting/creating transactor (which is only done until
* necessary)
private Long getTransactorID() {
if (tnode == null) {
tnode = env.getSharedResources().getTransactorNode();
return tnode.getTransactorID().getLongID();
private synchronized void close(boolean checkForStaleScan) {
if (asyncReader != null) {
if (status != TxStatus.CLOSED) {
status = TxStatus.CLOSED;
if (checkForStaleScan && !commitAttempted) {
Stamp stamp = env.getSharedResources().getOracleClient().getStamp();
if (startTs < stamp.getGcTimestamp()) {
throw new StaleScanException();
public void close() {
private synchronized void checkIfOpen() {
if (status != TxStatus.OPEN) {
throw new IllegalStateException("Transaction is no longer open! status = " + status);
protected void finalize() {
// TODO Log an error if transaction is not closed (See FLUO-486)
public long getStartTimestamp() {
return startTs;
public int getSize() {
// TODO could calculate as items are added/set
int size = 0;
for (Entry<Bytes, Map<Column, Bytes>> entry : updates.entrySet()) {
size += entry.getKey().length();
for (Entry<Column, Bytes> entry2 : entry.getValue().entrySet()) {
Column c = entry2.getKey();
size += c.getFamily().length();
size += c.getQualifier().length();
size += c.getVisibility().length();
size += entry2.getValue().length();
for (Entry<Bytes, Set<Column>> entry : columnsRead.entrySet()) {
size += entry.getKey().length();
for (Column c : entry.getValue()) {
size += c.getFamily().length();
size += c.getQualifier().length();
size += c.getVisibility().length();
return size;
abstract class CommitStep {
private CommitStep nextStep;
// the boolean indicates if the operation was successful.
abstract CompletableFuture<Boolean> getMainOp(CommitData cd);
// create and run this op in the event that the main op was a failure
abstract CompletableFuture<Void> getFailureOp(CommitData cd);
// set the next step to run if this step is successful
CommitStep andThen(CommitStep next) {
this.nextStep = next;
return next;
CompletableFuture<Void> compose(CommitData cd) {
return getMainOp(cd).thenComposeAsync(successful -> {
if (successful) {
if (nextStep != null) {
return nextStep.compose(cd);
} else {
return CompletableFuture.completedFuture(null);
} else {
return getFailureOp(cd);
}, env.getSharedResources().getAsyncCommitExecutor());
abstract class ConditionalStep extends CommitStep {
public abstract Collection<ConditionalMutation> createMutations(CommitData cd);
public abstract Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
throws Exception;
public abstract boolean processResults(CommitData cd, Iterator<Result> results)
throws Exception;
public AsyncConditionalWriter getACW(CommitData cd) {
return cd.acw;
CompletableFuture<Boolean> getMainOp(CommitData cd) {
// TODO not sure threading is correct
Executor ace = env.getSharedResources().getAsyncCommitExecutor();
return getACW(cd).apply(createMutations(cd)).thenCompose(results -> {
// ugh icky that this is an iterator, forces copy to inspect.. could refactor async CW to
// return collection
ArrayList<Result> resultsList = new ArrayList<>();
Iterators.addAll(resultsList, results);
boolean containsUknown = false;
for (Result result : resultsList) {
try {
containsUknown |= result.getStatus() == Status.UNKNOWN;
} catch (Exception e) {
throw new CompletionException(e);
if (containsUknown) {
// process unknown in sync executor
Executor se = env.getSharedResources().getSyncCommitExecutor();
return CompletableFuture.supplyAsync(() -> {
try {
return handleUnknown(cd, resultsList.iterator());
} catch (Exception e) {
throw new CompletionException(e);
}, se);
} else {
return CompletableFuture.completedFuture(resultsList.iterator());
}).thenApplyAsync(results -> {
try {
return processResults(cd, results);
} catch (Exception e) {
throw new CompletionException(e);
}, ace);
class LockPrimaryStep extends ConditionalStep {
public Collection<ConditionalMutation> createMutations(CommitData cd) {
return Collections
.singleton(prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, isTriggerRow(cd.prow)));
public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
throws Exception {
Result result = Iterators.getOnlyElement(results);
Status mutationStatus = result.getStatus();
// TODO convert this code to async
while (mutationStatus == Status.UNKNOWN) {
TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
switch (txInfo.status) {
case LOCKED:
return Collections
new Result(Status.ACCEPTED, result.getMutation(), result.getTabletServer()))
return Collections
new Result(Status.REJECTED, result.getMutation(), result.getTabletServer()))
// TODO async
Result newResult =;
mutationStatus = newResult.getStatus();
if (mutationStatus != Status.UNKNOWN) {
return Collections.singleton(newResult).iterator();
// TODO handle case were data other tx has lock
throw new IllegalStateException(
"unexpected tx state " + txInfo.status + " " + cd.prow + " " + cd.pcol);
throw new IllegalStateException();
public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception {
Result result = Iterators.getOnlyElement(results);
return result.getStatus() == Status.ACCEPTED;
CompletableFuture<Void> getFailureOp(CommitData cd) {
// TODO can this be simplified by pushing some code to the superclass?
return CompletableFuture.supplyAsync(() -> {
final ConditionalMutation pcm = Iterables.getOnlyElement(createMutations(cd));
// TODO do async
try {
} catch (Exception e) {
throw new CompletionException(e);
if (checkForAckCollision(pcm)) {
} else {
return null;
}, env.getSharedResources().getSyncCommitExecutor());
class LockOtherStep extends ConditionalStep {
public AsyncConditionalWriter getACW(CommitData cd) {
return cd.bacw;
public Collection<ConditionalMutation> createMutations(CommitData cd) {
ArrayList<ConditionalMutation> mutations = new ArrayList<>();
for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
ConditionalFlutation cm = null;
for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
if (cm == null) {
cm = prewrite(rowUpdates.getKey(), colUpdates.getKey(), colUpdates.getValue(), cd.prow,
cd.pcol, false);
} else {
prewrite(cm, colUpdates.getKey(), colUpdates.getValue(), cd.prow, cd.pcol, false);
cd.acceptedRows = new HashSet<>();
return mutations;
public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results) {
// TODO this step does not currently handle unknown
return results;
public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception {
while (results.hasNext()) {
Result result =;
// TODO handle unknown?
Bytes row = Bytes.of(result.getMutation().getRow());
if (result.getStatus() == Status.ACCEPTED) {
} else {
cd.addToRejected(row, updates.get(row).keySet());
return cd.getRejected().isEmpty();
CompletableFuture<Void> getFailureOp(CommitData cd) {
return CompletableFuture.supplyAsync(() -> {
try {
// Does this need to be async?
} catch (Exception e) {
throw new CompletionException(e);
return null;
}, env.getSharedResources().getSyncCommitExecutor()).thenCompose(v -> rollbackLocks(cd));
abstract class BatchWriterStep extends CommitStep {
public abstract Collection<Mutation> createMutations(CommitData cd);
CompletableFuture<Boolean> getMainOp(CommitData cd) {
return env.getSharedResources().getBatchWriter()
.writeMutationsAsyncFuture(createMutations(cd)).thenApply(v -> true);
CompletableFuture<Void> getFailureOp(CommitData cd) {
throw new IllegalStateException("Failure not expected");
private CompletableFuture<Void> rollbackLocks(CommitData cd) {
CommitStep firstStep = new RollbackOtherLocks();
firstStep.andThen(new RollbackPrimaryLock());
return firstStep.compose(cd)
.thenRun(() -> cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
class RollbackOtherLocks extends BatchWriterStep {
public Collection<Mutation> createMutations(CommitData cd) {
// roll back locks
// TODO let rollback be done lazily? this makes GC more difficult
Flutation m;
ArrayList<Mutation> mutations = new ArrayList<>(cd.acceptedRows.size());
for (Bytes row : cd.acceptedRows) {
m = new Flutation(env, row);
for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
if (isReadLock(entry.getValue())) {
m.put(entry.getKey(), ColumnType.RLOCK.encode(ReadLockUtil.encodeTs(startTs, true)),
} else {
m.put(entry.getKey(), ColumnType.DEL_LOCK.encode(startTs),
DelLockValue.encodeRollback(false, true));
return mutations;
class RollbackPrimaryLock extends BatchWriterStep {
public Collection<Mutation> createMutations(CommitData cd) {
// mark transaction as complete for garbage collection purposes
Flutation m = new Flutation(env, cd.prow);
m.put(cd.pcol, ColumnType.DEL_LOCK.encode(startTs),
DelLockValue.encodeRollback(startTs, true, true));
m.put(cd.pcol, ColumnType.TX_DONE.encode(startTs), EMPTY);
return Collections.singletonList(m);
class CommittedTestStep extends CommitStep {
CompletableFuture<Boolean> getMainOp(CommitData cd) {
return CompletableFuture.completedFuture(true);
CompletableFuture<Void> getFailureOp(CommitData cd) {
throw new IllegalStateException("Failure not expected");
public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) {
SyncCommitObserver sco = new SyncCommitObserver();
cd.commitObserver = sco;
try {
CommitStep firstStep = new GetCommitStampStepTest(commitStamp);
firstStep.andThen(new WriteNotificationsStep()).andThen(new CommitPrimaryStep())
.andThen(new CommittedTestStep());
firstStep.compose(cd).exceptionally(throwable -> {
setFailed(cd, throwable);
return null;
} catch (CommitException e) {
return false;
} catch (Exception e) {
throw new FluoException(e);
return true;
class GetCommitStampStep extends CommitStep {
protected CompletableFuture<Stamp> getStampOp() {
return env.getSharedResources().getOracleClient().getStampAsync();
CompletableFuture<Boolean> getMainOp(CommitData cd) {
return getStampOp().thenApply(commitStamp -> {
if (startTs < commitStamp.getGcTimestamp()) {
return false;
} else {
return true;
CompletableFuture<Void> getFailureOp(CommitData cd) {
return rollbackLocks(cd);
class GetCommitStampStepTest extends GetCommitStampStep {
private final Stamp testStamp;
public GetCommitStampStepTest(Stamp testStamp) {
this.testStamp = testStamp;
protected CompletableFuture<Stamp> getStampOp() {
return CompletableFuture.completedFuture(testStamp);
class WriteNotificationsStep extends BatchWriterStep {
public Collection<Mutation> createMutations(CommitData cd) {
long commitTs = getStats().getCommitTs();
HashMap<Bytes, Mutation> mutations = new HashMap<>();
if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) {
Flutation m = new Flutation(env, cd.prow);
Notification.put(env, m, cd.pcol, commitTs);
mutations.put(cd.prow, m);
for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
if (observedColumns.contains(colUpdates.getKey())) {
Bytes val = colUpdates.getValue();
if (isWrite(val) && !isDelete(val)) {
Mutation m = mutations.get(rowUpdates.getKey());
if (m == null) {
m = new Flutation(env, rowUpdates.getKey());
mutations.put(rowUpdates.getKey(), m);
Notification.put(env, m, colUpdates.getKey(), commitTs);
for (Entry<Bytes, Set<Column>> entry : weakNotifications.entrySet()) {
Mutation m = mutations.get(entry.getKey());
if (m == null) {
m = new Flutation(env, entry.getKey());
mutations.put(entry.getKey(), m);
for (Column col : entry.getValue()) {
Notification.put(env, m, col, commitTs);
return mutations.values();
class CommitPrimaryStep extends ConditionalStep {
public Collection<ConditionalMutation> createMutations(CommitData cd) {
long commitTs = getStats().getCommitTs();
IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
PrewriteIterator.setSnaptime(iterConf, startTs);
boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn());
Condition lockCheck =
new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(LockValue.encode(cd.prow,
cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger, getTransactorID()));
final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow, lockCheck);
ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
return Collections.singletonList(delLockMutation);
public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
throws Exception {
// the code for handing this is synchronous and needs to be handled in another thread pool
// TODO - how do we do the above without return a CF?
long commitTs = getStats().getCommitTs();
Result result = Iterators.getOnlyElement(results);
Status ms = result.getStatus();
while (ms == Status.UNKNOWN) {
// TODO async
TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
switch (txInfo.status) {
if (txInfo.commitTs != commitTs) {
throw new IllegalStateException(
cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs);
ms = Status.ACCEPTED;
case LOCKED:
// TODO async
ConditionalMutation delLockMutation = result.getMutation();
ms =;
ms = Status.REJECTED;
Result newResult = new Result(ms, result.getMutation(), result.getTabletServer());
return Collections.singletonList(newResult).iterator();
public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception {
Result result = Iterators.getOnlyElement(results);
return result.getStatus() == Status.ACCEPTED;
CompletableFuture<Void> getFailureOp(CommitData cd) {
return CompletableFuture.completedFuture(NULLS.get());
public boolean finishCommit(CommitData cd, Stamp commitStamp) {
cd.commitObserver = new SyncCommitObserver();
CommitStep firstStep = new DeleteLocksStep();
firstStep.andThen(new FinishCommitStep());
firstStep.compose(cd).exceptionally(throwable -> {
System.err.println("Unexpected exception in finish commit test method : ");
return null;
return true;
class DeleteLocksStep extends BatchWriterStep {
public Collection<Mutation> createMutations(CommitData cd) {
long commitTs = getStats().getCommitTs();
ArrayList<Mutation> mutations = new ArrayList<>(updates.size() + 1);
for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
Flutation m = new Flutation(env, rowUpdates.getKey());
boolean isTriggerRow = isTriggerRow(rowUpdates.getKey());
for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
isTriggerRow && colUpdates.getKey().equals(notification.getColumn()), false,
colUpdates.getKey(), isWrite(colUpdates.getValue()), isDelete(colUpdates.getValue()),
isReadLock(colUpdates.getValue()), startTs, commitTs, observedColumns, m);
return mutations;
class FinishCommitStep extends BatchWriterStep {
CompletableFuture<Boolean> getMainOp(CommitData cd) {
return super.getMainOp(cd).thenApply(b -> {
return true;
public Collection<Mutation> createMutations(CommitData cd) {
long commitTs = getStats().getCommitTs();
ArrayList<Mutation> afterFlushMutations = new ArrayList<>(2);
Flutation m = new Flutation(env, cd.prow);
// mark transaction as complete for garbage collection purposes
m.put(cd.pcol, ColumnType.TX_DONE.encode(commitTs), EMPTY);
if (weakNotification != null) {
afterFlushMutations.add(weakNotification.newDelete(env, startTs));
if (notification != null) {
afterFlushMutations.add(notification.newDelete(env, startTs));
return afterFlushMutations;
public synchronized void commitAsync(AsyncCommitObserver commitCallback) {
status = TxStatus.COMMIT_STARTED;
commitAttempted = true;
try {
CommitData cd = createCommitData();
cd = setUpBeginCommitAsync(cd, commitCallback, null);
if (cd != null) {
} catch (Exception e) {
private CommitData setUpBeginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback,
RowColumn primary) {
if (updates.isEmpty()) {
// TODO do async
return null;
for (Map<Column, Bytes> cols : updates.values()) {
Bytes primRow = null;
Column primCol = null;
if (primary != null) {
primRow = primary.getRow();
primCol = primary.getColumn();
if (notification != null && !primary.equals(notification.getRowColumn())) {
throw new IllegalArgumentException("Primary must be notification");
} else if (notification != null) {
primRow = notification.getRow();
primCol = notification.getColumn();
} else {
outer: for (Entry<Bytes, Map<Column, Bytes>> entry : updates.entrySet()) {
for (Entry<Column, Bytes> entry2 : entry.getValue().entrySet()) {
if (!isReadLock(entry2.getValue())) {
primRow = entry.getKey();
primCol = entry2.getKey();
break outer;
if (primRow == null) {
// there are only read locks, so nothing to write
return null;
// get a primary column
cd.prow = primRow;
Map<Column, Bytes> colSet = updates.get(cd.prow);
cd.pcol = primCol;
cd.pval = colSet.remove(primCol);
if (colSet.isEmpty()) {
cd.commitObserver = commitCallback;
return cd;
private void setFailed(CommitData cd, Throwable throwable) {
try {
} catch (RuntimeException e) {
Logger log = LoggerFactory.getLogger(TransactionImpl.class);
log.error("Failed to set tx failure (startTs=" + startTs + ") cause ", e);
log.error("Failed to set tx failure (startTs=" + startTs + ") lost throwable ", throwable);
private void beginCommitAsync(CommitData cd) {
// Notification are written between GetCommitStampStep and CommitPrimaryStep for the following
// reasons :
// * At this point all columns are locked, this guarantees that anything triggering as a
// result of this transaction will see all of this transactions changes.
// * The transaction is not yet committed. If the process dies at this point whatever
// was running this transaction should rerun and recreate all of the notifications.
// The next transactions will rerun because this transaction will have to be rolled back.
// * If notifications are written in the 2nd phase of commit, then when the 2nd phase
// partially succeeds notifications may never be written. Because in the case of failure
// notifications would not be written until a column is read and it may never be read.
// See
// Its very important the notifications which trigger an observer are deleted after the 2nd
// phase of commit finishes.
CommitStep firstStep = new LockPrimaryStep();
firstStep.andThen(new LockOtherStep()).andThen(new GetCommitStampStep())
.andThen(new WriteNotificationsStep()).andThen(new CommitPrimaryStep())
.andThen(new DeleteLocksStep()).andThen(new FinishCommitStep());
firstStep.compose(cd).exceptionally(throwable -> {
setFailed(cd, throwable);
return null;
private void beginCommitAsyncTest(CommitData cd) {
CommitStep firstStep = new LockPrimaryStep();
firstStep.andThen(new LockOtherStep()).andThen(new CommittedTestStep());
firstStep.compose(cd).exceptionally(throwable -> {
setFailed(cd, throwable);
return null;
public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns) {
return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns, false), startTs, stats);