blob: 27e63ef4c6b9dd6ee8bedcbabec9d704d1b8fc5b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.accumulo.iterators;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.values.WriteValue;
public class SnapshotIterator implements SortedKeyValueIterator<Key, Value> {
@VisibleForTesting
static final String TIMESTAMP_OPT = "timestampOpt";
static final String RETURN_READLOCK_PRESENT_OPT = "rrlpOpt";
private static final ByteSequence NOTIFY_CF_BS =
new ArrayByteSequence(ColumnConstants.NOTIFY_CF.toArray());
static final Set<ByteSequence> NOTIFY_CF_SET = Collections.singleton(NOTIFY_CF_BS);
private TimestampSkippingIterator source;
private long snaptime;
private boolean hasTop = false;
private boolean returnReadLockPresent = false;
private final Key curCol = new Key();
private Key readLockIgnore;
private Key readLockKey;
private Value readLockValue;
private void rememberReadLock(Key key, Value val) {
Preconditions.checkState(readLockKey == null && readLockValue == null);
if (readLockIgnore == null
|| !key.equals(readLockIgnore, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
readLockKey = new Key(key);
readLockValue = new Value(val);
}
}
private void ignoreReadLock(Key key) {
readLockIgnore = key;
}
private void clearReadLock() {
readLockKey = null;
readLockValue = null;
readLockIgnore = null;
}
private void findTop() throws IOException {
outer: while (source.hasTop() && readLockKey == null) {
long invalidationTime = -1;
long dataPointer = -1;
if (source.getTopKey().getColumnFamilyData().equals(NOTIFY_CF_BS)) {
throw new IllegalStateException("seeing notifications during snapshot iteration");
}
curCol.set(source.getTopKey());
while (source.hasTop()
&& curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
if (colType == ColumnConstants.TX_DONE_PREFIX) {
source.skipToPrefix(curCol, ColumnConstants.WRITE_PREFIX);
continue;
} else if (colType == ColumnConstants.WRITE_PREFIX) {
long timePtr = WriteValue.getTimestamp(source.getTopValue().get());
if (timePtr > invalidationTime) {
invalidationTime = timePtr;
}
if (dataPointer == -1) {
if (ts <= snaptime) {
dataPointer = timePtr;
source.skipToPrefix(curCol, ColumnConstants.DEL_LOCK_PREFIX);
continue;
} else {
source.skipToTimestamp(curCol, ColumnConstants.WRITE_PREFIX | snaptime);
continue;
}
}
} else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
if (ts > invalidationTime) {
invalidationTime = ts;
}
if (returnReadLockPresent) {
source.skipToPrefix(curCol, ColumnConstants.RLOCK_PREFIX);
} else {
source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
}
continue;
} else if (colType == ColumnConstants.RLOCK_PREFIX) {
if (returnReadLockPresent) {
rememberReadLock(source.getTopKey(), source.getTopValue());
}
source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
continue;
} else if (colType == ColumnConstants.LOCK_PREFIX) {
if (ts > invalidationTime && ts <= snaptime) {
// nothing supersedes this lock, therefore the column is locked
return;
} else {
if (dataPointer == -1) {
source.skipColumn(curCol);
continue outer;
} else {
source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX | dataPointer);
continue;
}
}
} else if (colType == ColumnConstants.DATA_PREFIX) {
if (dataPointer == ts) {
// found data for this column
return;
}
if (ts < dataPointer || dataPointer == -1) {
source.skipColumn(curCol);
continue outer;
}
if (ts > dataPointer) {
source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX | dataPointer);
continue;
}
} else if (colType == ColumnConstants.ACK_PREFIX) {
if (dataPointer == -1) {
source.skipColumn(curCol);
continue outer;
} else {
source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX | dataPointer);
continue;
}
} else {
throw new IllegalArgumentException();
}
// TODO handle case where dataPointer >=0, but no data was found
source.next();
}
}
}
@Override
public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
IteratorEnvironment env) throws IOException {
this.source = new TimestampSkippingIterator(source);
this.snaptime = Long.parseLong(options.get(TIMESTAMP_OPT));
this.returnReadLockPresent =
Boolean.parseBoolean(options.getOrDefault(RETURN_READLOCK_PRESENT_OPT, "false"));
// TODO could require client to send version as a sanity check
}
@Override
public boolean hasTop() {
return hasTop && (readLockKey != null || source.hasTop());
}
@Override
public void next() throws IOException {
if (readLockKey != null) {
clearReadLock();
} else {
curCol.set(source.getTopKey());
source.skipColumn(curCol);
findTop();
}
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
Range newRange = range;
Collection<ByteSequence> cols;
boolean inc;
clearReadLock();
// handle continue case
hasTop = true;
if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE
&& !range.isStartKeyInclusive()) {
if ((range.getStartKey().getTimestamp()
& ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
Key currCol = new Key(range.getStartKey());
currCol.setTimestamp(Long.MAX_VALUE);
newRange = new Range(currCol, true, range.getEndKey(), range.isEndKeyInclusive());
ignoreReadLock(currCol);
} else {
Key nextCol = range.getStartKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS);
if (range.afterEndKey(nextCol)) {
hasTop = false;
return;
} else {
newRange = new Range(nextCol, true, range.getEndKey(), range.isEndKeyInclusive());
}
}
} else {
newRange = range;
}
if (columnFamilies.size() == 0 && inclusive == false) {
cols = NOTIFY_CF_SET;
inc = false;
} else {
cols = columnFamilies;
inc = inclusive;
}
source.seek(newRange, cols, inc);
findTop();
}
@Override
public Key getTopKey() {
if (readLockKey != null) {
return readLockKey;
} else {
return source.getTopKey();
}
}
@Override
public Value getTopValue() {
if (readLockValue != null) {
return readLockValue;
} else {
return source.getTopValue();
}
}
@Override
public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
}
public static void setSnaptime(IteratorSetting cfg, long time) {
if (time < 0 || (ColumnConstants.PREFIX_MASK & time) != 0) {
throw new IllegalArgumentException();
}
cfg.addOption(TIMESTAMP_OPT, time + "");
}
public static void setReturnReadLockPresent(IteratorSetting cfg, boolean rrlp) {
cfg.addOption(RETURN_READLOCK_PRESENT_OPT, rrlp + "");
}
}