blob: 0e501ff00c488ee773fbee7938af7d94a28af0ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.statelib.impl.mvcc;
import static io.netty.util.ReferenceCountUtil.retain;
import static org.apache.bookkeeper.statelib.impl.Constants.NULL_END_KEY;
import static org.apache.bookkeeper.statelib.impl.Constants.NULL_START_KEY;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
import com.google.common.primitives.UnsignedBytes;
import com.google.protobuf.TextFormat;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.api.kv.impl.op.OpFactoryImpl;
import org.apache.bookkeeper.api.kv.impl.result.DeleteResultImpl;
import org.apache.bookkeeper.api.kv.impl.result.IncrementResultImpl;
import org.apache.bookkeeper.api.kv.impl.result.KeyValueFactory;
import org.apache.bookkeeper.api.kv.impl.result.KeyValueImpl;
import org.apache.bookkeeper.api.kv.impl.result.PutResultImpl;
import org.apache.bookkeeper.api.kv.impl.result.RangeResultImpl;
import org.apache.bookkeeper.api.kv.impl.result.ResultFactory;
import org.apache.bookkeeper.api.kv.impl.result.TxnResultImpl;
import org.apache.bookkeeper.api.kv.op.CompareOp;
import org.apache.bookkeeper.api.kv.op.CompareResult;
import org.apache.bookkeeper.api.kv.op.CompareTarget;
import org.apache.bookkeeper.api.kv.op.DeleteOp;
import org.apache.bookkeeper.api.kv.op.IncrementOp;
import org.apache.bookkeeper.api.kv.op.Op;
import org.apache.bookkeeper.api.kv.op.OpFactory;
import org.apache.bookkeeper.api.kv.op.PutOp;
import org.apache.bookkeeper.api.kv.op.RangeOp;
import org.apache.bookkeeper.api.kv.op.TxnOp;
import org.apache.bookkeeper.api.kv.options.Options;
import org.apache.bookkeeper.api.kv.options.RangeOption;
import org.apache.bookkeeper.api.kv.result.Code;
import org.apache.bookkeeper.api.kv.result.DeleteResult;
import org.apache.bookkeeper.api.kv.result.IncrementResult;
import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.api.kv.result.PutResult;
import org.apache.bookkeeper.api.kv.result.RangeResult;
import org.apache.bookkeeper.api.kv.result.Result;
import org.apache.bookkeeper.api.kv.result.TxnResult;
import org.apache.bookkeeper.common.coder.Coder;
import org.apache.bookkeeper.common.kv.KV;
import org.apache.bookkeeper.common.kv.KVImpl;
import org.apache.bookkeeper.statelib.api.exceptions.InvalidStateStoreException;
import org.apache.bookkeeper.statelib.api.exceptions.MVCCStoreException;
import org.apache.bookkeeper.statelib.api.exceptions.StateStoreRuntimeException;
import org.apache.bookkeeper.statelib.api.kv.KVIterator;
import org.apache.bookkeeper.statelib.api.kv.KVMulti;
import org.apache.bookkeeper.statelib.api.mvcc.MVCCStore;
import org.apache.bookkeeper.statelib.impl.Constants;
import org.apache.bookkeeper.statelib.impl.kv.RocksdbKVStore;
import org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
import org.apache.bookkeeper.stream.proto.kv.store.ValueType;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
/**
* MVCC Store Implementation.
*
* <p>The current implementation executes write operations in one single io thread.
* It can be improved later to leverage the revision numbers to achieve mvcc.
*/
@Slf4j
class MVCCStoreImpl<K, V> extends RocksdbKVStore<K, V> implements MVCCStore<K, V> {
private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
private final ResultFactory<K, V> resultFactory;
private final KeyValueFactory<K, V> recordFactory;
private final OpFactory<K, V> opFactory;
private final Coder<MVCCRecord> recordCoder = MVCCRecordCoder.of();
MVCCStoreImpl() {
this.resultFactory = new ResultFactory<>();
this.recordFactory = new KeyValueFactory<>();
this.opFactory = new OpFactoryImpl<>();
}
@Override
public OpFactory<K, V> getOpFactory() {
return opFactory;
}
@Override
public void put(K key, V value) {
throw new UnsupportedOperationException("Please use #put(PutOp op) instead");
}
@Override
public synchronized V putIfAbsent(K key, V value) {
throw new UnsupportedOperationException("Please use #put(PutOp op) instead");
}
@Override
public synchronized KVMulti<K, V> multi() {
throw new UnsupportedOperationException("Please use #txn(TxnOp op) instead");
}
@Override
public synchronized V delete(K key) {
throw new UnsupportedOperationException("Please use #delete(DeleteOp op) instead");
}
void increment(K key, long amount, long revision) {
try (IncrementOp<K, V> op = opFactory.newIncrement(key, amount, Options.blindIncrement())) {
try (IncrementResult<K, V> result = increment(revision, op)) {
if (Code.OK != result.code()) {
throw new MVCCStoreException(result.code(),
"Failed to increment (" + key + ", " + amount + ") to state store " + name);
}
}
}
}
void put(K key, V value, long revision) {
try (PutOp<K, V> op = opFactory.newPut(
key, value,
Options.blindPut())) {
try (PutResult<K, V> result = put(revision, op)) {
if (Code.OK != result.code()) {
throw new MVCCStoreException(result.code(),
"Failed to put (" + key + ", " + value + ", " + revision + ") to state store " + name);
}
}
}
}
void delete(K key, long revision) {
try (DeleteOp<K, V> op = opFactory.newDelete(
key,
Options.delete())) {
try (DeleteResult<K, V> result = delete(revision, op)) {
if (Code.OK != result.code()) {
throw new MVCCStoreException(result.code(),
"Failed to delete key=" + key + "from state store " + name);
}
}
}
}
void deleteRange(K key, K endKey, long revision) {
try (DeleteOp<K, V> op = opFactory.newDelete(
key,
opFactory.optionFactory().newDeleteOption()
.endKey(endKey)
.prevKv(false)
.build())) {
try (DeleteResult<K, V> result = delete(revision, op)) {
if (Code.OK != result.code()) {
throw new MVCCStoreException(result.code(),
"Failed to delete key=" + key + "from state store " + name);
}
}
}
}
Long getNumber(K key) {
try (RangeOp<K, V> op = opFactory.newRange(
key,
opFactory.optionFactory().newRangeOption()
.limit(1)
.build())) {
try (RangeResult<K, V> result = range(op)) {
if (Code.OK != result.code()) {
throw new MVCCStoreException(result.code(),
"Failed to retrieve key from store " + name + " : code = " + result.code());
}
if (result.count() <= 0) {
return null;
} else {
return result.kvs().get(0).numberValue();
}
}
}
}
@Override
public synchronized V get(K key) {
try (RangeOp<K, V> op = opFactory.newRange(
key,
opFactory.optionFactory().newRangeOption()
.limit(1)
.build())) {
try (RangeResult<K, V> result = range(op)) {
if (Code.OK != result.code()) {
throw new MVCCStoreException(result.code(),
"Failed to retrieve key from store " + name + " : code = " + result.code());
}
if (result.count() <= 0) {
return null;
} else {
return retain(result.kvs().get(0).value());
}
}
}
}
@Override
public synchronized KVIterator<K, V> range(K from, K to) {
checkStoreOpen();
RangeResultIterator iter = new RangeResultIterator(from, to);
kvIters.add(iter);
return iter;
}
class RangeResultIterator implements KVIterator<K, V> {
private final K to;
private K next;
private RangeResult<K, V> result;
private PeekingIterator<KeyValue<K, V>> resultIter;
private boolean eor = false;
private volatile boolean closed = false;
RangeResultIterator(K from, K to) {
this.to = to;
this.next = from;
}
private void ensureIteratorOpen() {
if (closed) {
throw new InvalidStateStoreException("MVCC state store " + name + " is already closed.");
}
}
@Override
public void close() {
kvIters.remove(this);
if (null != result) {
result.close();
}
closed = true;
}
private void getNextBatch() {
try (RangeOp<K, V> op = opFactory.newRange(
next,
opFactory.optionFactory().newRangeOption()
.endKey(to)
.limit(32)
.build())) {
this.result = range(op);
}
if (Code.OK != result.code()) {
throw new MVCCStoreException(result.code(),
"Failed to fetch kv pairs at range [" + next + ", " + to + "] from state store " + name);
}
this.resultIter = Iterators.peekingIterator(result.kvs().iterator());
}
private void skipFirstKey() {
while (this.resultIter.hasNext()) {
KeyValue<K, V> kv = this.resultIter.peek();
if (!kv.key().equals(next)) {
break;
}
this.resultIter.next();
}
}
@Override
public boolean hasNext() {
ensureIteratorOpen();
if (eor) {
return false;
}
if (null == result) {
getNextBatch();
}
if (!this.resultIter.hasNext()) {
if (this.result.more()) {
this.result.close();
getNextBatch();
skipFirstKey();
return hasNext();
} else {
eor = true;
return false;
}
}
return true;
}
@Override
public KV<K, V> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
KeyValue<K, V> kv = this.resultIter.next();
next = kv.key();
if (next.equals(to)) {
eor = true;
}
return new KVImpl<>(kv.key(), kv.value());
}
}
//
// Write View
//
private void executeBatch(WriteBatch batch) {
try {
db.write(writeOpts, batch);
} catch (RocksDBException e) {
throw new StateStoreRuntimeException("Error while executing a multi operation from state store " + name, e);
}
}
/**
* TODO: the increment operation can be optimized using rocksdb merge operator.
*/
@Override
public IncrementResult<K, V> increment(long revision, IncrementOp<K, V> op) {
try {
return processIncrement(revision, op);
} catch (MVCCStoreException e) {
IncrementResultImpl<K, V> result = resultFactory.newIncrementResult(revision);
result.code(e.getCode());
return result;
} catch (StateStoreRuntimeException e) {
IncrementResultImpl<K, V> result = resultFactory.newIncrementResult(revision);
result.code(Code.INTERNAL_ERROR);
return result;
}
}
synchronized IncrementResult<K, V> processIncrement(long revision, IncrementOp<K, V> op) {
checkStoreOpen();
WriteBatch batch = new WriteBatch();
IncrementResult<K, V> result = null;
try {
result = increment(revision, batch, op);
updateLastRevision(batch, revision);
executeBatch(batch);
return result;
} catch (StateStoreRuntimeException e) {
if (null != result) {
result.close();
}
throw e;
} finally {
RocksUtils.close(batch);
}
}
private IncrementResult<K, V> increment(long revision, WriteBatch batch, IncrementOp<K, V> op) {
// parameters
final K key = op.key();
final long amount = op.amount();
// raw key
final byte[] rawKey = keyCoder.encode(key);
MVCCRecord record;
try {
record = getKeyRecord(key, rawKey);
} catch (StateStoreRuntimeException e) {
throw e;
}
// result
final IncrementResultImpl<K, V> result = resultFactory.newIncrementResult(revision);
try {
long oldAmount = 0L;
if (null != record) {
// validate the update revision before applying the update to the record
if (record.compareModRev(revision) >= 0) {
result.code(Code.SMALLER_REVISION);
return result;
}
if (ValueType.NUMBER != record.getValueType()) {
result.code(Code.ILLEGAL_OP);
return result;
}
record.setVersion(record.getVersion() + 1);
oldAmount = record.getValue().getLong(0);
} else {
record = MVCCRecord.newRecord();
record.setCreateRev(revision);
record.setVersion(0L);
record.setValue(PooledByteBufAllocator.DEFAULT.buffer(Long.BYTES), ValueType.NUMBER);
}
long newAmount = oldAmount + amount;
record.getValue().writerIndex(0);
record.getValue().writeLong(newAmount);
record.setModRev(revision);
record.setExpireTime(System.currentTimeMillis() + (ttlSeconds * 1000));
// write the mvcc record back
batch.put(dataCfHandle, rawKey, recordCoder.encode(record));
// finalize the result
result.code(Code.OK);
if (op.option().getTotal()) {
result.totalAmount(newAmount);
}
return result;
} catch (RocksDBException rde) {
result.close();
throw new StateStoreRuntimeException(rde);
} catch (StateStoreRuntimeException e) {
result.close();
throw e;
} finally {
if (null != record) {
record.recycle();
}
}
}
@Override
public PutResult<K, V> put(long revision, PutOp<K, V> op) {
try {
return processPut(revision, op);
} catch (MVCCStoreException e) {
PutResultImpl<K, V> result = resultFactory.newPutResult(revision);
result.code(e.getCode());
return result;
} catch (StateStoreRuntimeException e) {
PutResultImpl<K, V> result = resultFactory.newPutResult(revision);
result.code(Code.INTERNAL_ERROR);
return result;
}
}
synchronized PutResult<K, V> processPut(long revision, PutOp<K, V> op) {
checkStoreOpen();
WriteBatch batch = new WriteBatch();
PutResult<K, V> result = null;
try {
result = put(revision, batch, op);
updateLastRevision(batch, revision);
executeBatch(batch);
return result;
} catch (StateStoreRuntimeException e) {
if (null != result) {
result.close();
}
throw e;
} finally {
RocksUtils.close(batch);
}
}
private PutResult<K, V> put(long revision, WriteBatch batch, PutOp<K, V> op) {
// parameters
final K key = op.key();
final V val = op.value();
// raw key & value
final byte[] rawKey = keyCoder.encode(key);
final ByteBuf rawValBuf = valCoder.encodeBuf(val);
MVCCRecord record;
try {
record = getKeyRecord(key, rawKey);
} catch (StateStoreRuntimeException e) {
ReferenceCountUtil.release(rawValBuf);
throw e;
}
// result
final PutResultImpl<K, V> result = resultFactory.newPutResult(revision);
MVCCRecord oldRecord = null;
try {
if (null != record) {
// validate the update revision before applying the update to the record
if (record.compareModRev(revision) >= 0) {
result.code(Code.SMALLER_REVISION);
return result;
}
if (ValueType.BYTES != record.getValueType()) {
result.code(Code.ILLEGAL_OP);
return result;
}
if (op.option().prevKv()) {
// make a copy before modification
oldRecord = record.duplicate();
}
record.setVersion(record.getVersion() + 1);
} else {
record = MVCCRecord.newRecord();
record.setCreateRev(revision);
record.setVersion(0);
}
record.setValue(rawValBuf, ValueType.BYTES);
record.setModRev(revision);
record.setExpireTime(System.currentTimeMillis() + (ttlSeconds * 1000));
// write the mvcc record back
batch.put(dataCfHandle, rawKey, recordCoder.encode(record));
// finalize the result
result.code(Code.OK);
if (null != oldRecord) {
KeyValueImpl<K, V> prevKV = oldRecord.asKVRecord(
recordFactory,
key,
valCoder);
result.prevKv(prevKV);
}
return result;
} catch (StateStoreRuntimeException e) {
result.close();
throw e;
} catch (RocksDBException e) {
result.close();
throw new StateStoreRuntimeException(e);
} finally {
if (null != record) {
record.recycle();
}
if (null != oldRecord) {
oldRecord.recycle();
}
}
}
//
// Delete Op
//
@Override
public DeleteResult<K, V> delete(long revision, DeleteOp<K, V> op) {
try {
return processDelete(revision, op);
} catch (MVCCStoreException e) {
DeleteResultImpl<K, V> result = resultFactory.newDeleteResult(revision);
result.code(e.getCode());
return result;
} catch (StateStoreRuntimeException e) {
DeleteResultImpl<K, V> result = resultFactory.newDeleteResult(revision);
result.code(Code.INTERNAL_ERROR);
return result;
}
}
synchronized DeleteResult<K, V> processDelete(long revision, DeleteOp<K, V> op) {
checkStoreOpen();
WriteBatch batch = new WriteBatch();
DeleteResult<K, V> result = null;
try {
result = delete(revision, batch, op, true);
updateLastRevision(batch, revision);
executeBatch(batch);
return result;
} catch (StateStoreRuntimeException e) {
if (null != result) {
result.close();
}
throw e;
} finally {
RocksUtils.close(batch);
}
}
DeleteResult<K, V> delete(long revision, WriteBatch batch, DeleteOp<K, V> op, boolean allowBlind) {
// parameters
final K key = op.key();
final K endKey = op.option().endKey();
final boolean blind = allowBlind && !op.option().prevKv();
final byte[] rawKey = (null != key) ? keyCoder.encode(key) : NULL_START_KEY;
final byte[] rawEndKey = (null != endKey) ? keyCoder.encode(endKey) : null;
// result
final DeleteResultImpl<K, V> result = resultFactory.newDeleteResult(revision);
final List<byte[]> keys = Lists.newArrayList();
final List<MVCCRecord> records = Lists.newArrayList();
try {
long numDeleted;
if (blind) {
deleteBlind(batch, rawKey, rawEndKey);
numDeleted = 0;
} else {
numDeleted = deleteUsingIter(
batch,
key,
rawKey,
rawEndKey,
keys,
records,
false);
}
List<KeyValue<K, V>> kvs = toKvs(keys, records);
result.code(Code.OK);
result.prevKvs(kvs);
result.numDeleted(numDeleted);
} catch (StateStoreRuntimeException e) {
result.close();
throw e;
} finally {
records.forEach(MVCCRecord::recycle);
}
return result;
}
/**
* Delete blind should be call as the last op in the delete operations.
* Since we need to modify endKey to make {@link WriteBatch#deleteRange(byte[], byte[])}
* delete the end key.
*/
void deleteBlind(WriteBatch batch,
byte[] key,
@Nullable byte[] endKey) {
try {
if (null == endKey) {
batch.delete(key);
} else {
Pair<byte[], byte[]> realRange = getRealRange(key, endKey);
endKey = realRange.getRight();
++endKey[endKey.length - 1];
batch.deleteRange(realRange.getLeft(), endKey);
}
} catch (RocksDBException e) {
throw new StateStoreRuntimeException(e);
}
}
long deleteUsingIter(WriteBatch batch,
K key,
byte[] rawKey,
@Nullable byte[] rawEndKey,
List<byte[]> resultKeys,
List<MVCCRecord> resultValues,
boolean countOnly) {
MutableLong numKvs = new MutableLong(0L);
if (null == rawEndKey) {
MVCCRecord record = getKeyRecord(key, rawKey);
if (null != record) {
if (!countOnly) {
resultKeys.add(rawKey);
resultValues.add(record);
} else {
record.recycle();
}
numKvs.add(1L);
try {
batch.delete(rawKey);
} catch (RocksDBException e) {
throw new StateStoreRuntimeException(e);
}
}
} else {
Pair<byte[], byte[]> realRange = getRealRange(rawKey, rawEndKey);
rawKey = realRange.getLeft();
rawEndKey = realRange.getRight();
getKeyRecords(
rawKey,
rawEndKey,
resultKeys,
resultValues,
numKvs,
null,
-1,
countOnly);
deleteBlind(batch, rawKey, rawEndKey);
}
return numKvs.longValue();
}
//
// Txn Op
//
@Override
public synchronized TxnResult<K, V> txn(long revision, TxnOp<K, V> op) {
try {
return processTxn(revision, op);
} catch (MVCCStoreException e) {
TxnResultImpl<K, V> result = resultFactory.newTxnResult(revision);
result.code(e.getCode());
return result;
} catch (StateStoreRuntimeException e) {
TxnResultImpl<K, V> result = resultFactory.newTxnResult(revision);
result.code(Code.INTERNAL_ERROR);
return result;
}
}
synchronized TxnResult<K, V> processTxn(long revision, TxnOp<K, V> op) {
checkStoreOpen();
// 1. process the compares
boolean success = processCompares(op);
// 2. prepare the response list
List<Op<K, V>> operations;
List<Result<K, V>> results;
if (success) {
operations = op.successOps();
} else {
operations = op.failureOps();
}
if (operations == null) {
operations = Collections.emptyList();
}
results = Lists.newArrayListWithExpectedSize(operations.size());
// 3. process the operations
try (WriteBatch batch = new WriteBatch()) {
for (Op<K, V> o : operations) {
results.add(executeOp(revision, batch, o));
}
updateLastRevision(batch, revision);
executeBatch(batch);
// 4. repare the result
TxnResultImpl<K, V> txnResult = resultFactory.newTxnResult(revision);
txnResult.isSuccess(success);
txnResult.results(results);
txnResult.code(Code.OK);
return txnResult;
} catch (StateStoreRuntimeException e) {
results.forEach(Result::close);
throw e;
}
}
boolean processCompareOp(CompareOp<K, V> op) {
MVCCRecord record = null;
K key = op.key();
byte[] rawKey = keyCoder.encode(key);
try {
record = getKeyRecord(key, rawKey);
if (null == record) {
if (CompareTarget.VALUE != op.target()) {
throw new MVCCStoreException(Code.KEY_NOT_FOUND,
"Key '" + TextFormat.escapeBytes(rawKey) + "' is not found");
}
}
return processCompareOp(record, op);
} finally {
if (null != record) {
record.recycle();
}
}
}
boolean processCompareOp(@Nullable MVCCRecord record,
CompareOp<K, V> op) {
int cmp;
switch (op.target()) {
case MOD:
cmp = record.compareModRev(op.revision());
break;
case CREATE:
cmp = record.compareCreateRev(op.revision());
break;
case VERSION:
cmp = record.compareVersion(op.revision());
break;
case VALUE:
if (null == record) { // key not found
if (CompareResult.EQUAL == op.result()) {
return op.value() == null;
} else if (CompareResult.NOT_EQUAL == op.result()) {
return op.value() != null;
} else {
return false;
}
}
// key is found and value-to-compare is present
if (op.value() != null) {
byte[] rawValue = valCoder.encode(op.value());
cmp = record.getValue().compareTo(Unpooled.wrappedBuffer(rawValue));
} else {
// key is found but value-to-compare is missing
switch (op.result()) {
case EQUAL:
case LESS:
return false;
default:
return true;
}
}
break;
default:
return false;
}
boolean success;
switch (op.result()) {
case LESS:
success = cmp < 0;
break;
case EQUAL:
success = cmp == 0;
break;
case GREATER:
success = cmp > 0;
break;
case NOT_EQUAL:
success = cmp != 0;
break;
default:
success = false;
break;
}
return success;
}
boolean processCompares(TxnOp<K, V> op) {
for (CompareOp<K, V> compare : op.compareOps()) {
if (processCompareOp(compare)) {
continue;
}
return false;
}
return true;
}
private Result<K, V> executeOp(long revision, WriteBatch batch, Op<K, V> op) {
if (op instanceof PutOp) {
return put(revision, batch, (PutOp<K, V>) op);
} else if (op instanceof DeleteOp) {
return delete(revision, batch, (DeleteOp<K, V>) op, true);
} else if (op instanceof RangeOp) {
return range((RangeOp<K, V>) op);
} else {
throw new MVCCStoreException(Code.ILLEGAL_OP, "Unknown operation in a transaction : " + op);
}
}
//
// Read View
//
private boolean getKeyRecords(byte[] rawKey,
byte[] rawEndKey,
List<byte[]> resultKeys,
List<MVCCRecord> resultValues,
MutableLong numKvs,
RangeOption<K> rangeOption,
long limit,
boolean countOnly) {
try (RocksIterator iter = db.newIterator(dataCfHandle)) {
iter.seek(rawKey);
boolean eor = false;
while (iter.isValid() && (limit < 0 || resultKeys.size() < limit)) {
byte[] key = iter.key();
if (COMPARATOR.compare(rawEndKey, key) < 0) {
eor = true;
break;
}
MVCCRecord val = recordCoder.decode(iter.value());
if (val.expired()) {
val.recycle();
continue;
}
processRecord(key, val, resultKeys, resultValues, numKvs, rangeOption, countOnly);
iter.next();
}
if (eor) {
return false;
} else {
return iter.isValid();
}
}
}
private void processRecord(byte[] key,
MVCCRecord record,
List<byte[]> resultKeys,
List<MVCCRecord> resultValues,
MutableLong numKvs,
RangeOption<K> rangeOption,
boolean countOnly) {
if (null == rangeOption && countOnly) {
numKvs.increment();
return;
}
if (record.test(rangeOption)) {
numKvs.increment();
if (countOnly) {
record.recycle();
} else {
resultKeys.add(key);
resultValues.add(record);
}
} else {
record.recycle();
}
}
private MVCCRecord getKeyRecord(K key, byte[] keyBytes) {
try {
byte[] valBytes = this.db.get(dataCfHandle, keyBytes);
if (null == valBytes) {
return null;
}
MVCCRecord record = recordCoder.decode(valBytes);
if (record.expired()) {
record.recycle();
record = null;
}
return record;
} catch (RocksDBException e) {
throw new StateStoreRuntimeException("Error while getting value for key "
+ key + " from state store " + name, e);
}
}
@Override
public RangeResult<K, V> range(RangeOp<K, V> rangeOp) {
try {
return processRange(rangeOp);
} catch (MVCCStoreException e) {
RangeResultImpl<K, V> result = resultFactory.newRangeResult(-1L);
result.code(e.getCode());
return result;
} catch (StateStoreRuntimeException e) {
RangeResultImpl<K, V> result = resultFactory.newRangeResult(-1L);
result.code(Code.INTERNAL_ERROR);
return result;
}
}
synchronized RangeResult<K, V> processRange(RangeOp<K, V> rangeOp) {
checkStoreOpen();
// parameters
final K key = rangeOp.key();
final K endKey = rangeOp.option().endKey();
// result
final RangeResultImpl<K, V> result = resultFactory.newRangeResult(-1L);
// raw key
byte[] rawKey = (null != key) ? keyCoder.encode(key) : NULL_START_KEY;
byte[] rawEndKey = NULL_END_KEY;
if (null == endKey) {
// point lookup
MVCCRecord record = getKeyRecord(key, rawKey);
try {
if (null == record || !record.test(rangeOp.option())) {
result.count(0);
result.kvs(Collections.emptyList());
} else {
result.count(1);
result.kvs(Lists.newArrayList(record.asKVRecord(
recordFactory,
key,
valCoder)));
}
result.more(false);
result.code(Code.OK);
return result;
} finally {
if (null != record) {
record.recycle();
}
}
} else {
rawEndKey = keyCoder.encode(endKey);
}
Pair<byte[], byte[]> realRange = getRealRange(rawKey, rawEndKey);
rawKey = realRange.getLeft();
rawEndKey = realRange.getRight();
// range lookup
List<byte[]> keys = Lists.newArrayList();
List<MVCCRecord> records = Lists.newArrayList();
MutableLong numKvs = new MutableLong(0L);
try {
boolean hasMore = getKeyRecords(
rawKey,
rawEndKey,
keys,
records,
numKvs,
rangeOp.option(),
rangeOp.option().limit(),
false);
List<KeyValue<K, V>> kvs = toKvs(keys, records);
result.code(Code.OK);
result.kvs(kvs);
result.count(kvs.size());
result.more(hasMore);
} finally {
records.forEach(MVCCRecord::recycle);
}
return result;
}
private List<KeyValue<K, V>> toKvs(List<byte[]> keys, List<MVCCRecord> records) {
List<KeyValue<K, V>> kvs = Lists.newArrayListWithExpectedSize(keys.size());
for (int i = 0; i < keys.size(); i++) {
byte[] keyBytes = keys.get(i);
MVCCRecord record = records.get(i);
kvs.add(record.asKVRecord(
recordFactory,
keyCoder.decode(keyBytes),
valCoder
));
}
return kvs;
}
private Pair<byte[], byte[]> getRealRange(byte[] rawKey, byte[] rawEndKey) {
boolean isNullStartKey = Constants.isNullStartKey(rawKey);
boolean isNullEndKey = Constants.isNullEndKey(rawEndKey);
if (isNullStartKey || isNullEndKey) {
try (RocksIterator iter = db.newIterator(dataCfHandle)) {
if (isNullStartKey) {
iter.seekToFirst();
if (!iter.isValid()) {
// no key to delete
return null;
}
rawKey = iter.key();
}
if (isNullEndKey) {
iter.seekToLast();
if (!iter.isValid()) {
// no key to delete
return null;
}
rawEndKey = iter.key();
}
}
}
return Pair.of(rawKey, rawEndKey);
}
}