blob: ea00c73612ef5864631bf6b83c83a36dcca23f21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.core.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Status;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.iterators.OpenReadLockIterator;
import org.apache.fluo.accumulo.iterators.PrewriteIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
import org.apache.fluo.accumulo.values.DelReadLockValue;
import org.apache.fluo.accumulo.values.LockValue;
import org.apache.fluo.accumulo.values.ReadLockValue;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.core.util.ByteUtil;
import org.apache.fluo.core.util.ColumnUtil;
import org.apache.fluo.core.util.ConditionalFlutation;
import org.apache.fluo.core.util.FluoCondition;
import org.apache.fluo.core.util.SpanUtil;
import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK;
import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
/**
* This is utility code for either rolling forward or back failed transactions. A transaction is
* deemed to have failed if the reading transaction waited too long or the transactor id does not
* exist in zookeeper.
*/
public class LockResolver {
private static Map<PrimaryRowColumn, List<LockInfo>> groupLocksByPrimary(List<LockInfo> locks) {
Map<PrimaryRowColumn, List<LockInfo>> groupedLocks = new HashMap<>();
Map<PrimaryRowColumn, Long> transactorIds = new HashMap<>();
for (LockInfo lockInfo : locks) {
PrimaryRowColumn prc = new PrimaryRowColumn(lockInfo.prow, lockInfo.pcol, lockInfo.lockTs);
List<LockInfo> lockList = groupedLocks.computeIfAbsent(prc, k -> new ArrayList<>());
Long trid = transactorIds.get(prc);
if (trid == null) {
transactorIds.put(prc, lockInfo.transactorId);
} else if (!trid.equals(lockInfo.transactorId)) {
// sanity check.. its assumed that all locks w/ the same PrimaryRowColumn should have the
// same transactor id as well
throw new IllegalStateException("transactor ids not equals " + prc + " "
+ lockInfo.entry.getKey() + " " + trid + " " + lockInfo.transactorId);
}
lockList.add(lockInfo);
}
return groupedLocks;
}
private static class LockInfo {
final Bytes prow;
final Column pcol;
final Long transactorId;
final long lockTs;
final boolean isReadLock;
final Entry<Key, Value> entry;
public LockInfo(Entry<Key, Value> kve) {
long rawTs = kve.getKey().getTimestamp();
this.entry = kve;
if ((rawTs & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
this.lockTs = ReadLockUtil.decodeTs(rawTs);
ReadLockValue rlv = new ReadLockValue(kve.getValue().get());
this.prow = rlv.getPrimaryRow();
this.pcol = rlv.getPrimaryColumn();
this.transactorId = rlv.getTransactor();
this.isReadLock = true;
} else {
this.lockTs = kve.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
LockValue lv = new LockValue(kve.getValue().get());
this.prow = lv.getPrimaryRow();
this.pcol = lv.getPrimaryColumn();
this.transactorId = lv.getTransactor();
this.isReadLock = false;
}
}
}
/**
* Attempts to roll forward or roll back a set of locks encountered by a transaction reading data.
*
* @param env environment
* @param startTs The logical start time from the oracle of the transaction that encountered the
* lock
* @param stats Stats object for the transaction that encountered the lock
* @param locksKVs List of locks
* @param startTime The wall time that the transaction that encountered the lock first saw the
* lock
* @return true if all locks passed in were resolved (rolled forward or back)
*/
static boolean resolveLocks(Environment env, long startTs, TxStats stats,
List<Entry<Key, Value>> locksKVs, long startTime) {
// check if transactor is still alive
int numResolved = 0;
Map<ByteSequence, Mutation> mutations = new HashMap<>();
boolean timedOut = false;
TransactorCache transactorCache = env.getSharedResources().getTransactorCache();
List<LockInfo> locks = new ArrayList<>();
locksKVs.forEach(e -> locks.add(new LockInfo(e)));
List<LockInfo> locksToRecover;
if (System.currentTimeMillis() - startTime > env.getConfiguration()
.getTransactionRollbackTime()) {
locksToRecover = locks;
stats.incrementTimedOutLocks(locksToRecover.size());
timedOut = true;
} else {
locksToRecover = new ArrayList<>(locks.size());
for (LockInfo lockInfo : locks) {
if (transactorCache.checkTimedout(lockInfo.transactorId, lockInfo.lockTs)) {
locksToRecover.add(lockInfo);
stats.incrementTimedOutLocks();
} else if (!transactorCache.checkExists(lockInfo.transactorId)) {
locksToRecover.add(lockInfo);
stats.incrementDeadLocks();
}
}
}
Map<PrimaryRowColumn, List<LockInfo>> groupedLocks = groupLocksByPrimary(locksToRecover);
if (timedOut) {
Set<Entry<PrimaryRowColumn, List<LockInfo>>> es = groupedLocks.entrySet();
for (Entry<PrimaryRowColumn, List<LockInfo>> entry : es) {
long lockTs = entry.getKey().startTs;
Long transactorId = entry.getValue().get(0).transactorId;
transactorCache.addTimedoutTransactor(transactorId, lockTs, startTime);
}
}
TxInfoCache txiCache = env.getSharedResources().getTxInfoCache();
Set<Entry<PrimaryRowColumn, List<LockInfo>>> es = groupedLocks.entrySet();
for (Entry<PrimaryRowColumn, List<LockInfo>> group : es) {
TxInfo txInfo = txiCache.getTransactionInfo(group.getKey());
switch (txInfo.status) {
case COMMITTED:
commitColumns(env, group.getKey(), group.getValue(), txInfo.commitTs, mutations);
numResolved += group.getValue().size();
break;
case LOCKED:
if (rollbackPrimary(env, startTs, group.getKey(), txInfo.lockValue)) {
rollback(env, startTs, group.getKey(), group.getValue(), mutations);
numResolved += group.getValue().size();
}
break;
case ROLLED_BACK:
// TODO ensure this if ok if there concurrent rollback
rollback(env, startTs, group.getKey(), group.getValue(), mutations);
numResolved += group.getValue().size();
break;
case UNKNOWN:
default:
throw new IllegalStateException(
"can not abort : " + group.getKey() + " (" + txInfo.status + ")");
}
}
if (mutations.size() > 0) {
env.getSharedResources().getBatchWriter().writeMutations(new ArrayList<>(mutations.values()));
}
return numResolved == locks.size();
}
private static void rollback(Environment env, long startTs, PrimaryRowColumn prc,
List<LockInfo> value, Map<ByteSequence, Mutation> mutations) {
for (LockInfo lockInfo : value) {
if (isPrimary(prc, lockInfo.entry.getKey())) {
continue;
}
Mutation mut = getMutation(lockInfo.entry.getKey().getRowData(), mutations);
Key k = lockInfo.entry.getKey();
if (lockInfo.isReadLock) {
mut.put(k.getColumnFamilyData().toArray(), k.getColumnQualifierData().toArray(),
k.getColumnVisibilityParsed(),
ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(lockInfo.lockTs, true),
DelReadLockValue.encodeRollback());
} else {
mut.put(k.getColumnFamilyData().toArray(), k.getColumnQualifierData().toArray(),
k.getColumnVisibilityParsed(), ColumnConstants.DEL_LOCK_PREFIX | lockInfo.lockTs,
DelLockValue.encodeRollback(false, true));
}
}
}
private static boolean rollbackPrimary(Environment env, long startTs, PrimaryRowColumn prc,
byte[] lockValue) {
// TODO review use of PrewriteIter here
IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
PrewriteIterator.setSnaptime(iterConf, startTs);
ConditionalFlutation delLockMutation = new ConditionalFlutation(env, prc.prow,
new FluoCondition(env, prc.pcol).setIterators(iterConf).setValue(lockValue));
delLockMutation.put(prc.pcol, ColumnConstants.DEL_LOCK_PREFIX | prc.startTs,
DelLockValue.encodeRollback(true, true));
ConditionalWriter cw = null;
cw = env.getSharedResources().getConditionalWriter();
// TODO handle other conditional writer cases
try {
return cw.write(delLockMutation).getStatus() == Status.ACCEPTED;
} catch (AccumuloException e) {
throw new RuntimeException(e);
} catch (AccumuloSecurityException e) {
throw new RuntimeException(e);
}
}
private static void commitColumns(Environment env, PrimaryRowColumn prc, List<LockInfo> value,
long commitTs, Map<ByteSequence, Mutation> mutations) {
for (LockInfo lockInfo : value) {
if (isPrimary(prc, lockInfo.entry.getKey())) {
continue;
}
long lockTs = lockInfo.lockTs;
// TODO may be that a stronger sanity check that could be done here
if (commitTs < lockTs) {
throw new IllegalStateException(
"bad commitTs : " + lockInfo.entry.getKey() + " (" + commitTs + "<" + lockTs + ")");
}
Mutation mut = getMutation(lockInfo.entry.getKey().getRowData(), mutations);
Column col = ColumnUtil.convert(lockInfo.entry.getKey());
if (lockInfo.isReadLock) {
ColumnUtil.commitColumn(env, false, false, col, false, false, true, lockTs, commitTs,
env.getConfiguredObservers().getObservedColumns(STRONG), mut);
} else {
LockValue lv = new LockValue(lockInfo.entry.getValue().get());
ColumnUtil.commitColumn(env, lv.isTrigger(), false, col, lv.isWrite(), lv.isDelete(), false,
lockTs, commitTs, env.getConfiguredObservers().getObservedColumns(STRONG), mut);
}
}
}
private static Mutation getMutation(ByteSequence row, Map<ByteSequence, Mutation> mutations) {
Mutation mut = mutations.get(row);
if (mut == null) {
mut = new Mutation(row.toArray());
mutations.put(row, mut);
}
return mut;
}
private static boolean isPrimary(PrimaryRowColumn prc, Key k) {
return prc.prow.equals(ByteUtil.toBytes(k.getRowData()))
&& prc.pcol.equals(SpanUtil.toRowColumn(k).getColumn());
}
static List<Entry<Key, Value>> getOpenReadLocks(Environment env,
Map<Bytes, Set<Column>> rowColsToCheck) throws Exception {
List<Range> ranges = new ArrayList<>();
for (Entry<Bytes, Set<Column>> e1 : rowColsToCheck.entrySet()) {
for (Column col : e1.getValue()) {
Key start = SpanUtil.toKey(new RowColumn(e1.getKey(), col));
Key end = new Key(start);
end.setTimestamp(ColumnConstants.LOCK_PREFIX | ColumnConstants.TIMESTAMP_MASK);
ranges.add(new Range(start, true, end, false));
}
}
BatchScanner bscanner = null;
try {
bscanner = env.getConnector().createBatchScanner(env.getTable(), env.getAuthorizations(), 1);
bscanner.setRanges(ranges);
IteratorSetting iterCfg = new IteratorSetting(10, OpenReadLockIterator.class);
bscanner.addScanIterator(iterCfg);
List<Entry<Key, Value>> ret = new ArrayList<>();
for (Entry<Key, Value> entry : bscanner) {
if ((entry.getKey().getTimestamp() & PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
ret.add(entry);
}
}
return ret;
} finally {
if (bscanner != null) {
bscanner.close();
}
}
}
}