blob: feb4094094c1f21bba406a316d003cdd22edb854 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.statelib.impl.mvcc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.io.Files;
import java.io.File;
import java.util.List;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.api.kv.op.CompareResult;
import org.apache.bookkeeper.api.kv.op.OpType;
import org.apache.bookkeeper.api.kv.op.RangeOp;
import org.apache.bookkeeper.api.kv.op.TxnOp;
import org.apache.bookkeeper.api.kv.options.Options;
import org.apache.bookkeeper.api.kv.result.Code;
import org.apache.bookkeeper.api.kv.result.DeleteResult;
import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.api.kv.result.PutResult;
import org.apache.bookkeeper.api.kv.result.RangeResult;
import org.apache.bookkeeper.api.kv.result.Result;
import org.apache.bookkeeper.api.kv.result.TxnResult;
import org.apache.bookkeeper.common.coder.StringUtf8Coder;
import org.apache.bookkeeper.common.kv.KV;
import org.apache.bookkeeper.statelib.api.StateStoreSpec;
import org.apache.bookkeeper.statelib.api.exceptions.MVCCStoreException;
import org.apache.bookkeeper.statelib.api.kv.KVIterator;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
/**
* Unit test of {@link MVCCStoreImpl}.
*/
@Slf4j
public class TestMVCCStoreImpl {
@Rule
public final TestName runtime = new TestName();
private File tempDir;
private StateStoreSpec spec;
private MVCCStoreImpl<String, String> store;
@Before
public void setUp() {
tempDir = Files.createTempDir();
spec = StateStoreSpec.builder()
.name(runtime.getMethodName())
.keyCoder(StringUtf8Coder.of())
.valCoder(StringUtf8Coder.of())
.localStateStoreDir(tempDir)
.stream(runtime.getMethodName())
.build();
store = new MVCCStoreImpl<>();
}
@After
public void tearDown() throws Exception {
if (null != store) {
store.close();
}
if (null != tempDir) {
FileUtils.deleteDirectory(tempDir);
}
}
//
// Operations
//
@Test
public void testGetNull() throws Exception {
store.init(spec);
assertNull(store.get("key"));
}
@Test
public void testGetValue() throws Exception {
store.init(spec);
store.put("key", "value", 1L);
assertEquals("value", store.get("key"));
}
@Test
public void testPutValueSmallerRevision() throws Exception {
store.init(spec);
store.put("key", "value", 2L);
assertEquals("value", store.get("key"));
try {
store.put("key", "newValue", 1L);
fail("Should fail to put a value with smaller revision");
} catch (MVCCStoreException e) {
assertEquals(Code.SMALLER_REVISION, e.getCode());
}
}
private String getKey(int i) {
return String.format("key-%05d", i);
}
private String getValue(int i) {
return String.format("value-%05d", i);
}
private void writeKVs(int numPairs, long revision) {
for (int i = 0; i < numPairs; i++) {
store.put(getKey(i), getValue(i), revision);
}
}
@Test
public void testAllRange() throws Exception {
store.init(spec);
writeKVs(100, 1L);
KVIterator<String, String> iter = store.range(
getKey(0),
getKey(100));
int idx = 0;
while (iter.hasNext()) {
KV<String, String> kv = iter.next();
assertEquals(getKey(idx), kv.key());
assertEquals(getValue(idx), kv.value());
++idx;
}
assertEquals(100, idx);
iter.close();
}
@Test
public void testSubRange() throws Exception {
store.init(spec);
writeKVs(100, 1L);
KVIterator<String, String> iter = store.range(
getKey(20),
getKey(79));
int idx = 20;
while (iter.hasNext()) {
KV<String, String> kv = iter.next();
assertEquals(getKey(idx), kv.key());
assertEquals(getValue(idx), kv.value());
++idx;
}
assertEquals(80, idx);
iter.close();
}
@Test
public void testRangeOp() throws Exception {
store.init(spec);
long revision = 99L;
writeKVs(100, revision);
@Cleanup RangeOp<String, String> op = store.getOpFactory().newRange(
getKey(20),
store.getOpFactory().optionFactory().newRangeOption()
.endKey(getKey(79))
.limit(100)
.build());
@Cleanup RangeResult<String, String> result = store.range(op);
assertEquals(Code.OK, result.code());
assertEquals(60, result.count());
assertEquals(60, result.kvs().size());
assertEquals(false, result.more());
int idx = 20;
for (KeyValue<String, String> record : result.kvs()) {
assertEquals(getKey(idx), record.key());
assertEquals(getValue(idx), record.value());
assertEquals(revision, record.createRevision());
assertEquals(revision, record.modifiedRevision());
assertEquals(0, record.version());
++idx;
}
assertEquals(80, idx);
}
@Test
public void testRangeOpNoMoreKvs() throws Exception {
store.init(spec);
long revision = 99L;
writeKVs(100, revision);
@Cleanup RangeOp<String, String> op = store.getOpFactory().newRange(
getKey(20),
store.getOpFactory().optionFactory().newRangeOption()
.endKey(getKey(99))
.limit(100)
.build());
@Cleanup RangeResult<String, String> result = store.range(op);
assertEquals(Code.OK, result.code());
assertEquals(80, result.count());
assertEquals(80, result.kvs().size());
assertEquals(false, result.more());
int idx = 20;
for (KeyValue<String, String> record : result.kvs()) {
assertEquals(getKey(idx), record.key());
assertEquals(getValue(idx), record.value());
assertEquals(revision, record.createRevision());
assertEquals(revision, record.modifiedRevision());
assertEquals(0, record.version());
++idx;
}
assertEquals(100, idx);
}
@Test
public void testRangeOpHasMoreKvs() throws Exception {
store.init(spec);
long revision = 99L;
writeKVs(100, revision);
@Cleanup RangeOp<String, String> op = store.getOpFactory().newRange(
getKey(20),
store.getOpFactory().optionFactory().newRangeOption()
.endKey(getKey(79))
.limit(20)
.build());
@Cleanup RangeResult<String, String> result = store.range(op);
assertEquals(Code.OK, result.code());
assertEquals(20, result.count());
assertEquals(20, result.kvs().size());
assertEquals(true, result.more());
int idx = 20;
for (KeyValue<String, String> record : result.kvs()) {
assertEquals(getKey(idx), record.key());
assertEquals(getValue(idx), record.value());
assertEquals(revision, record.createRevision());
assertEquals(revision, record.modifiedRevision());
assertEquals(0, record.version());
++idx;
}
assertEquals(40, idx);
}
@Test
public void testDeleteKey() throws Exception {
store.init(spec);
store.put("key", "value", 99L);
assertEquals("value", store.get("key"));
store.delete("key", 100L);
assertNull(store.get("key"));
}
@Test
public void testDeleteHeadRange() throws Exception {
store.init(spec);
// write 100 kvs
writeKVs(100, 99L);
// iterate all kvs
KVIterator<String, String> iter = store.range(
getKey(0), getKey(100));
int idx = 0;
while (iter.hasNext()) {
KV<String, String> kv = iter.next();
assertEquals(getKey(idx), kv.key());
assertEquals(getValue(idx), kv.value());
++idx;
}
assertEquals(100, idx);
iter.close();
// delete range
store.deleteRange(getKey(0), getKey(20), 100L);
iter = store.range(
getKey(0), getKey(100));
idx = 21;
while (iter.hasNext()) {
KV<String, String> kv = iter.next();
assertEquals(getKey(idx), kv.key());
assertEquals(getValue(idx), kv.value());
++idx;
}
assertEquals(100, idx);
iter.close();
}
@Test
public void testDeleteTailRange() throws Exception {
store.init(spec);
// write 100 kvs
writeKVs(100, 99L);
// iterate all kvs
KVIterator<String, String> iter = store.range(
getKey(0), getKey(100));
int idx = 0;
while (iter.hasNext()) {
KV<String, String> kv = iter.next();
assertEquals(getKey(idx), kv.key());
assertEquals(getValue(idx), kv.value());
++idx;
}
assertEquals(100, idx);
iter.close();
// delete range
store.deleteRange(getKey(10), getKey(100), 100L);
iter = store.range(
getKey(0), getKey(100));
idx = 0;
while (iter.hasNext()) {
KV<String, String> kv = iter.next();
assertEquals(getKey(idx), kv.key());
assertEquals(getValue(idx), kv.value());
++idx;
}
assertEquals(10, idx);
iter.close();
}
@Test
public void testDeleteRange() throws Exception {
store.init(spec);
// write 100 kvs
writeKVs(100, 99L);
// iterate all kvs
KVIterator<String, String> iter = store.range(
getKey(0),
getKey(100));
int idx = 0;
while (iter.hasNext()) {
KV<String, String> kv = iter.next();
assertEquals(getKey(idx), kv.key());
assertEquals(getValue(idx), kv.value());
++idx;
}
assertEquals(100, idx);
iter.close();
// delete range
store.deleteRange(getKey(10), getKey(20), 100L);
iter = store.range(
getKey(0),
getKey(100));
idx = 0;
while (iter.hasNext()) {
KV<String, String> kv = iter.next();
assertEquals(getKey(idx), kv.key());
assertEquals(getValue(idx), kv.value());
++idx;
if (10 == idx) {
idx = 21;
}
}
assertEquals(100, idx);
iter.close();
}
@Test
public void testTxnCompareSuccess() throws Exception {
store.init(spec);
// write 10 kvs at revision 99L
writeKVs(20, 99L);
@Cleanup TxnOp<String, String> txnOp = store.getOpFactory().newTxn()
.If(
store.getOpFactory().compareCreateRevision(
CompareResult.EQUAL,
getKey(10),
99L))
.Then(
store.getOpFactory().newPut(
getKey(11),
"test-value",
Options.putAndGet()))
.Else(
store.getOpFactory().newDelete(
getKey(11),
store.getOpFactory().optionFactory().newDeleteOption()
.build()))
.build();
@Cleanup TxnResult<String, String> result = store.txn(100L, txnOp);
assertEquals(Code.OK, result.code());
assertEquals(100L, result.revision());
assertEquals(1, result.results().size());
assertTrue(result.isSuccess());
Result<String, String> subResult = result.results().get(0);
assertEquals(OpType.PUT, subResult.type());
PutResult<String, String> putResult = (PutResult<String, String>) subResult;
KeyValue<String, String> prevResult = putResult.prevKv();
assertEquals(getKey(11), prevResult.key());
assertEquals(getValue(11), prevResult.value());
assertEquals(99L, prevResult.createRevision());
assertEquals(99L, prevResult.modifiedRevision());
assertEquals(0L, prevResult.version());
assertEquals("test-value", store.get(getKey(11)));
}
@Test
public void testTxnCompareFailure() throws Exception {
store.init(spec);
// write 10 kvs at revision 99L
writeKVs(20, 99L);
@Cleanup TxnOp<String, String> txnOp = store.getOpFactory().newTxn()
.If(
store.getOpFactory().compareCreateRevision(
CompareResult.NOT_EQUAL,
getKey(10),
99L))
.Then(
store.getOpFactory().newPut(
getKey(11),
"test-value",
Options.putAndGet()))
.Else(
store.getOpFactory().newDelete(
getKey(11),
Options.deleteAndGet()))
.build();
@Cleanup TxnResult<String, String> result = store.txn(100L, txnOp);
assertEquals(Code.OK, result.code());
assertEquals(100L, result.revision());
assertEquals(1, result.results().size());
assertFalse(result.isSuccess());
Result<String, String> subResult = result.results().get(0);
assertEquals(OpType.DELETE, subResult.type());
DeleteResult<String, String> deleteResult = (DeleteResult<String, String>) subResult;
List<KeyValue<String, String>> prevResults = deleteResult.prevKvs();
assertEquals(1, prevResults.size());
KeyValue<String, String> prevResult = prevResults.get(0);
assertEquals(getKey(11), prevResult.key());
assertEquals(getValue(11), prevResult.value());
assertEquals(99L, prevResult.createRevision());
assertEquals(99L, prevResult.modifiedRevision());
assertEquals(0L, prevResult.version());
assertEquals(null, store.get(getKey(11)));
}
@Test
public void testIncrement() throws Exception {
store.init(spec);
for (int i = 0; i < 5; i++) {
store.increment("key", 100L, 99L + i);
Long number = store.getNumber("key");
assertNotNull(number);
assertEquals(100L * (i + 1), number.longValue());
}
}
@Test
public void testIncrementFailure() throws Exception {
store.init(spec);
store.put("key", "value", 1L);
try {
store.increment("key", 100, 2L);
fail("Should fail to increment a non-number key");
} catch (MVCCStoreException e) {
assertEquals(Code.ILLEGAL_OP, e.getCode());
}
}
}