blob: 8d824052ec6accdce957aae10f932a77afb08344 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.storage.am.lsm.btree;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
import org.apache.hyracks.storage.am.btree.OrderedIndexTestContext;
import org.apache.hyracks.storage.am.btree.OrderedIndexTestDriver;
import org.apache.hyracks.storage.am.btree.OrderedIndexTestUtils;
import org.apache.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.common.CheckTuple;
import org.apache.hyracks.storage.am.common.IIndexTestContext;
import org.apache.hyracks.storage.am.common.TestOperationCallback;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.config.AccessMethodTestsConfig;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestContext;
import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@SuppressWarnings("rawtypes")
public class LSMBTreeUpdateInPlaceScanDiskComponentsTest extends OrderedIndexTestDriver {
private final LSMBTreeTestHarness harness = new LSMBTreeTestHarness();
protected final TreeSet<UpdatedCheckTuple> checkTuples = new TreeSet<>();
private boolean hasOnlyKeys;
class UpdatedCheckTuple<T extends Comparable<T>> extends CheckTuple<T> {
private boolean isUpdated = false;
private boolean antimatter = false;
public UpdatedCheckTuple(int numFields, int numKeys) {
super(numFields, numKeys);
}
public void setUpdated(boolean updated) {
isUpdated = updated;
}
public void setAntimatter(boolean antimatter) {
this.antimatter = antimatter;
}
public boolean isUpdated() {
return isUpdated;
}
public boolean isAntimatter() {
return antimatter;
}
}
public LSMBTreeUpdateInPlaceScanDiskComponentsTest() {
super(LSMBTreeTestHarness.LEAF_FRAMES_TO_TEST);
}
@Before
public void setUp() throws HyracksDataException {
harness.setUp();
}
@After
public void tearDown() throws HyracksDataException {
harness.tearDown();
}
@Override
protected Random getRandom() {
return harness.getRandom();
}
@Override
protected String getTestOpName() {
return "Update in-place disk Components Scan";
}
@Override
protected OrderedIndexTestContext createTestContext(ISerializerDeserializer[] fieldSerdes, int numKeys,
BTreeLeafFrameType leafType, boolean filtered) throws HyracksDataException {
return LSMBTreeTestContext.create(harness.getIOManager(), harness.getVirtualBufferCaches(),
harness.getFileReference(), harness.getDiskBufferCache(), fieldSerdes, numKeys,
harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(), harness.getOperationTracker(),
harness.getIOScheduler(), harness.getIOOperationCallbackFactory(),
harness.getPageWriteCallbackFactory(), harness.getMetadataPageManagerFactory(), false, !hasOnlyKeys,
hasOnlyKeys);
}
interface IndexModification {
void modify(IIndexTestContext ctx, ITupleReference tuple, UpdatedCheckTuple checkTuple)
throws HyracksDataException;
}
IndexModification insertModification =
(IIndexTestContext ctx, ITupleReference tuple, UpdatedCheckTuple checkTuple) -> {
try {
ctx.getIndexAccessor().insert(tuple);
checkTuples.add(checkTuple);
} catch (HyracksDataException e) {
// We set expected values only after insertion succeeds because
// we ignore duplicate keys.
if (!e.matches(ErrorCode.DUPLICATE_KEY)) {
throw e;
}
}
};
IndexModification upsertModification =
(IIndexTestContext ctx, ITupleReference tuple, UpdatedCheckTuple checkTuple) -> {
try {
ctx.getIndexAccessor().upsert(tuple);
SortedSet<UpdatedCheckTuple> existingSet = checkTuples.subSet(checkTuple, true, checkTuple, true);
if (existingSet.isEmpty()) {
checkTuples.add(checkTuple);
} else {
UpdatedCheckTuple existingTuple = existingSet.first();
checkTuples.remove(existingTuple);
checkTuple.setUpdated(!existingTuple.isUpdated());
checkTuples.add(checkTuple);
}
} catch (HyracksDataException e) {
// We set expected values only after insertion succeeds because
// we ignore duplicate keys.
if (!e.matches(ErrorCode.DUPLICATE_KEY)) {
throw e;
}
}
};
IndexModification deleteModification =
(IIndexTestContext ctx, ITupleReference tuple, UpdatedCheckTuple checkTuple) -> {
ctx.getIndexAccessor().delete(tuple);
// Remove check tuple from expected results.
if (!checkTuples.contains(checkTuple)) {
fail("Trying to delete tuple " + checkTuple + " that does not exist");
}
checkTuple.setUpdated(!checkTuple.isUpdated());
checkTuple.setAntimatter(true);
};
abstract class TupleOperation {
protected final IndexModification op;
public TupleOperation(IndexModification op) {
this.op = op;
}
public abstract void performOperation(IIndexTestContext ctx, int numTuples) throws HyracksDataException;
}
class GenerateTupleOperation extends TupleOperation {
public GenerateTupleOperation(IndexModification op) {
super(op);
}
@Override
public void performOperation(IIndexTestContext ctx, int numTuples) throws HyracksDataException {
int fieldCount = ctx.getFieldCount();
int numKeyFields = ctx.getKeyFieldCount();
// Scale range of values according to number of keys.
// For example, for 2 keys we want the square root of numTuples, for 3
// keys the cube root of numTuples, etc.
int maxValue = (int) Math.ceil(Math.pow(numTuples, 1.0 / numKeyFields));
for (int i = 0; i < numTuples; i++) {
if (LOGGER.isInfoEnabled()) {
if ((i + 1) % (numTuples / Math.min(10, numTuples)) == 0) {
LOGGER.info("Generating Tuple " + (i + 1) + "/" + numTuples);
}
}
UpdatedCheckTuple newCheckTuple;
if (ctx.getFieldSerdes()[0] instanceof IntegerSerializerDeserializer) {
newCheckTuple =
createIntTuple(fieldCount, numKeyFields, maxValue, ctx.getTupleBuilder(), ctx.getTuple());
} else {
newCheckTuple = createStringTuple(ctx.getFieldSerdes(), fieldCount, numKeyFields,
ctx.getTupleBuilder(), ctx.getTuple());
}
op.modify(ctx, ctx.getTuple(), newCheckTuple);
}
}
}
private UpdatedCheckTuple createStringTuple(ISerializerDeserializer[] fieldSerdes, int fieldCount, int numKeyFields,
ArrayTupleBuilder tupleBuilder, ArrayTupleReference tuple) throws HyracksDataException {
String[] fieldValues = new String[fieldCount];
// Set keys.
for (int j = 0; j < numKeyFields; j++) {
int length = (Math.abs(getRandom().nextInt()) % 10) + 1;
fieldValues[j] = OrderedIndexTestUtils.getRandomString(length, getRandom());
}
// Set values.
for (int j = numKeyFields; j < fieldCount; j++) {
fieldValues[j] = OrderedIndexTestUtils.getRandomString(5, getRandom());
}
TupleUtils.createTuple(tupleBuilder, tuple, fieldSerdes, false, (Object[]) fieldValues);
UpdatedCheckTuple<String> checkTuple = new UpdatedCheckTuple<>(fieldValues.length, numKeyFields);
for (String s : fieldValues) {
checkTuple.appendField(s);
}
return checkTuple;
}
private UpdatedCheckTuple createIntTuple(int fieldCount, int numKeyFields, int maxValue,
ArrayTupleBuilder tupleBuilder, ArrayTupleReference tuple) throws HyracksDataException {
int[] fieldValues = new int[fieldCount];
// Set keys.
for (int j = 0; j < numKeyFields; j++) {
fieldValues[j] = getRandom().nextInt() % maxValue;
}
// Set values.
for (int j = numKeyFields; j < fieldCount; j++) {
fieldValues[j] = j;
}
TupleUtils.createIntegerTuple(tupleBuilder, tuple, fieldValues);
UpdatedCheckTuple<Integer> checkTuple = new UpdatedCheckTuple<>(fieldValues.length, numKeyFields);
for (int v : fieldValues) {
checkTuple.appendField(v);
}
return checkTuple;
}
class DeleteTupleOperation extends TupleOperation {
public DeleteTupleOperation(IndexModification op) {
super(op);
}
@Override
public void performOperation(IIndexTestContext ctx, int numTuples) throws HyracksDataException {
ArrayTupleBuilder deleteTupleBuilder = new ArrayTupleBuilder(ctx.getKeyFieldCount());
ArrayTupleReference deleteTuple = new ArrayTupleReference();
int numCheckTuples = checkTuples.size();
// Copy CheckTuple references into array, so we can randomly pick from there.
UpdatedCheckTuple[] checkTuplesArray = new UpdatedCheckTuple[numCheckTuples];
int idx = 0;
for (UpdatedCheckTuple t : checkTuples) {
checkTuplesArray[idx++] = t;
}
for (int i = 0; i < numTuples && numCheckTuples > 0; i++) {
if (LOGGER.isInfoEnabled()) {
if ((i + 1) % (numTuples / Math.min(10, numTuples)) == 0) {
LOGGER.info("Deleting Tuple " + (i + 1) + "/" + numTuples);
}
}
int checkTupleIdx = Math.abs(getRandom().nextInt() % numCheckTuples);
UpdatedCheckTuple checkTuple = checkTuplesArray[checkTupleIdx];
OrderedIndexTestUtils.createTupleFromCheckTuple(checkTuple, deleteTupleBuilder, deleteTuple,
ctx.getFieldSerdes());
op.modify(ctx, deleteTuple, checkTuple);
// Swap with last "valid" CheckTuple.
UpdatedCheckTuple tmp = checkTuplesArray[numCheckTuples - 1];
checkTuplesArray[numCheckTuples - 1] = checkTuple;
checkTuplesArray[checkTupleIdx] = tmp;
numCheckTuples--;
}
}
}
private void insertDeleteTest(OrderedIndexTestContext ctx) throws HyracksDataException {
test(ctx, new GenerateTupleOperation(insertModification), new DeleteTupleOperation(deleteModification));
}
private void upsertDeleteTest(OrderedIndexTestContext ctx) throws HyracksDataException {
test(ctx, new GenerateTupleOperation(upsertModification), new DeleteTupleOperation(deleteModification));
}
private void insertUpsertTest(OrderedIndexTestContext ctx) throws HyracksDataException {
test(ctx, new GenerateTupleOperation(insertModification), new GenerateTupleOperation(upsertModification));
}
private void upsertUpsertTest(OrderedIndexTestContext ctx) throws HyracksDataException {
test(ctx, new GenerateTupleOperation(upsertModification), new GenerateTupleOperation(upsertModification));
}
@Override
protected void runTest(ISerializerDeserializer[] fieldSerdes, int numKeys, BTreeLeafFrameType leafType,
ITupleReference lowKey, ITupleReference highKey, ITupleReference prefixLowKey,
ITupleReference prefixHighKey) throws Exception {
//tests with tuples consisting only of keys create secondary index, others - a primary index
hasOnlyKeys = fieldSerdes.length == numKeys;
OrderedIndexTestContext ctx = createTestContext(fieldSerdes, numKeys, leafType, false);
ctx.getIndex().create();
ctx.getIndex().activate();
insertDeleteTest(ctx);
ctx.getIndex().clear();
checkTuples.clear();
upsertDeleteTest(ctx);
ctx.getIndex().clear();
checkTuples.clear();
insertUpsertTest(ctx);
ctx.getIndex().clear();
checkTuples.clear();
upsertUpsertTest(ctx);
ctx.getIndex().clear();
checkTuples.clear();
ctx.getIndex().validate();
ctx.getIndex().deactivate();
ctx.getIndex().destroy();
}
protected void test(OrderedIndexTestContext ctx, TupleOperation op1, TupleOperation op2)
throws HyracksDataException {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
op1.performOperation(ctx, AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT);
op2.performOperation(ctx,
AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT / AccessMethodTestsConfig.BTREE_NUM_INSERT_ROUNDS);
accessor.scheduleFlush();
LSMBTree btree = (LSMBTree) ctx.getIndex();
Assert.assertEquals("Check disk components", 1, btree.getDiskComponents().size());
ILSMDiskComponent btreeComponent = btree.getDiskComponents().get(0);
IndexAccessParameters actx =
new IndexAccessParameters(TestOperationCallback.INSTANCE, TestOperationCallback.INSTANCE);
BTree.BTreeAccessor btreeAccessor = ((BTree) btreeComponent.getIndex()).createAccessor(actx);
ITreeIndexCursor cursor = btreeAccessor.createDiskOrderScanCursor();
try {
btreeAccessor.diskOrderScan(cursor);
try {
for (UpdatedCheckTuple t : checkTuples) {
if (!t.isUpdated() || !hasOnlyKeys) {
checkReturnedTuple((LSMBTreeTupleReference) getNext(cursor), ctx.getFieldSerdes(), t,
ctx.getKeyFieldCount());
}
}
Assert.assertFalse(cursor.hasNext());
} finally {
cursor.close();
}
} finally {
cursor.destroy();
}
}
protected void checkReturnedTuple(LSMBTreeTupleReference tuple, ISerializerDeserializer[] fieldSerdes,
UpdatedCheckTuple checkTuple, int numKeys) throws HyracksDataException {
Assert.assertEquals("Check tuple anti-matter flag", checkTuple.isAntimatter(), tuple.isAntimatter());
//check keys
for (int i = 0; i < numKeys; i++) {
ByteArrayInputStream inStream =
new ByteArrayInputStream(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
DataInput dataIn = new DataInputStream(inStream);
Object actualObj = fieldSerdes[i].deserialize(dataIn);
Assert.assertEquals("Actual and expected keys do not match on key " + i, checkTuple.getField(i), actualObj);
}
//check the rest of fields only if the tuple is not anti-matter
if (!tuple.isAntimatter()) {
for (int i = numKeys; i < fieldSerdes.length; i++) {
ByteArrayInputStream inStream = new ByteArrayInputStream(tuple.getFieldData(i), tuple.getFieldStart(i),
tuple.getFieldLength(i));
DataInput dataIn = new DataInputStream(inStream);
Object actualObj = fieldSerdes[i].deserialize(dataIn);
Assert.assertEquals("Actual and expected fields do not match on field " + i, checkTuple.getField(i),
actualObj);
}
}
}
protected ITupleReference getNext(IIndexCursor cursor) throws HyracksDataException {
Assert.assertTrue(cursor.hasNext());
cursor.next();
return cursor.getTuple();
}
}