blob: a60ebdee509990a5acd427e5828524a1aa581763 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage.rocksdb.instance;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageProfile;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbProfileView;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.IgniteUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
/** Contains tests for {@link SharedRocksDbInstance}. */
@ExtendWith(ConfigurationExtension.class)
class SharedRocksDbInstanceTest extends IgniteAbstractTest {
private RocksDbStorageEngine engine;
private RocksDbStorageProfile storageProfile;
private SharedRocksDbInstance rocksDb;
@BeforeEach
void setUp(
@InjectConfiguration("mock.profiles.default = {engine = \"rocksDb\", size = 16777216, writeBufferSize = 16777216}")
StorageConfiguration storageConfiguration,
@InjectConfiguration RocksDbStorageEngineConfiguration engineConfig
) throws Exception {
engine = new RocksDbStorageEngine("test", engineConfig, storageConfiguration, workDir, mock(LogSyncer.class));
engine.start();
var profileConfig = (RocksDbProfileView) storageConfiguration.profiles().get("default").value();
storageProfile = new RocksDbStorageProfile(profileConfig);
storageProfile.start();
rocksDb = createDb();
}
@AfterEach
void tearDown() throws Exception {
IgniteUtils.closeAllManually(
rocksDb == null ? null : rocksDb::stop,
storageProfile == null ? null : storageProfile::stop,
engine == null ? null : engine::stop
);
}
private SharedRocksDbInstance createDb() throws Exception {
return new SharedRocksDbInstanceCreator().create(engine, storageProfile, workDir);
}
@Test
void testSortedIndexCfCaching() {
byte[] fooName = sortedIndexCfName(List.of(
new StorageSortedIndexColumnDescriptor("a", NativeTypes.INT64, true, true)
));
byte[] barName = sortedIndexCfName(List.of(
new StorageSortedIndexColumnDescriptor("b", NativeTypes.UUID, true, true)
));
byte[] bazName = sortedIndexCfName(List.of(
new StorageSortedIndexColumnDescriptor("c", NativeTypes.INT64, true, true)
));
byte[] quuxName = sortedIndexCfName(List.of(
new StorageSortedIndexColumnDescriptor("d", NativeTypes.INT64, true, true)
));
ColumnFamily foo = rocksDb.getOrCreateSortedIndexCf(fooName, 1, 0);
// Different index CF, same table.
ColumnFamily bar = rocksDb.getOrCreateSortedIndexCf(barName, 2, 0);
// Same index CF, same table.
ColumnFamily baz = rocksDb.getOrCreateSortedIndexCf(bazName, 3, 0);
// Same index CF, different table.
ColumnFamily quux = rocksDb.getOrCreateSortedIndexCf(quuxName, 4, 1);
assertThat(foo, is(sameInstance(baz)));
assertThat(foo, is(not(sameInstance(bar))));
assertThat(quux, is((sameInstance(baz))));
rocksDb.removeSortedIndex(1, foo);
rocksDb.destroySortedIndexCfIfNeeded(foo);
assertTrue(cfExists(fooName));
rocksDb.removeSortedIndex(2, bar);
rocksDb.destroySortedIndexCfIfNeeded(bar);
assertFalse(cfExists(barName));
rocksDb.removeSortedIndex(3, baz);
rocksDb.destroySortedIndexCfIfNeeded(baz);
assertTrue(cfExists(fooName));
rocksDb.removeSortedIndex(4, quux);
rocksDb.destroySortedIndexCfIfNeeded(quux);
assertFalse(cfExists(fooName));
}
@Test
void testSortedIndexRecovery() throws Exception {
byte[] fooName = sortedIndexCfName(List.of(
new StorageSortedIndexColumnDescriptor("a", NativeTypes.INT64, true, true)
));
byte[] barName = sortedIndexCfName(List.of(
new StorageSortedIndexColumnDescriptor("b", NativeTypes.UUID, true, true)
));
byte[] bazName = sortedIndexCfName(List.of(
new StorageSortedIndexColumnDescriptor("c", NativeTypes.INT64, true, true)
));
byte[] quuxName = sortedIndexCfName(List.of(
new StorageSortedIndexColumnDescriptor("d", NativeTypes.INT64, true, true)
));
ColumnFamily foo = rocksDb.getOrCreateSortedIndexCf(fooName, 1, 0);
// Different index CF, same table.
ColumnFamily bar = rocksDb.getOrCreateSortedIndexCf(barName, 2, 0);
// Same index CF, same table.
ColumnFamily baz = rocksDb.getOrCreateSortedIndexCf(bazName, 3, 0);
// Same index CF, different table.
ColumnFamily quux = rocksDb.getOrCreateSortedIndexCf(quuxName, 4, 1);
assertThat(
rocksDb.sortedIndexes(0).stream().map(IndexColumnFamily::indexId).collect(toList()),
containsInAnyOrder(1, 2, 3)
);
assertThat(
rocksDb.sortedIndexes(0).stream().map(IndexColumnFamily::columnFamily).collect(toList()),
containsInAnyOrder(foo, bar, baz)
);
assertThat(
rocksDb.sortedIndexes(1).stream().map(IndexColumnFamily::indexId).collect(toList()),
containsInAnyOrder(4)
);
assertThat(
rocksDb.sortedIndexes(1).stream().map(IndexColumnFamily::columnFamily).collect(toList()),
containsInAnyOrder(quux)
);
// Put some data in the CF. We then check that the non-empty CF is restored upon DB restart but the empty one is dropped.
byte[] key = ByteBuffer.allocate(Integer.BYTES * 2)
.order(KEY_BYTE_ORDER)
.putInt(0)
.putInt(1)
.array();
foo.put(key, BYTE_EMPTY_ARRAY);
rocksDb.stop();
rocksDb = createDb();
assertThat(rocksDb.sortedIndexes(0).stream().map(IndexColumnFamily::indexId).collect(toList()), contains(1));
assertThat(rocksDb.sortedIndexes(1), is(empty()));
assertTrue(cfExists(fooName));
assertFalse(cfExists(barName));
}
@Test
void testHashIndexRecovery() throws Exception {
assertThat(rocksDb.hashIndexIds(2), is(empty()));
assertThat(rocksDb.hashIndexIds(4), is(empty()));
assertThat(rocksDb.hashIndexIds(5), is(empty()));
// Put some data in the CF. We then check that the non-empty CF is restored upon DB restart but the empty one is dropped.
byte[] key1 = ByteBuffer.allocate(Integer.BYTES * 2)
.order(KEY_BYTE_ORDER)
.putInt(2)
.putInt(1)
.array();
byte[] key2 = ByteBuffer.allocate(Integer.BYTES * 2)
.order(KEY_BYTE_ORDER)
.putInt(4)
.putInt(3)
.array();
rocksDb.hashIndexCf().put(key1, BYTE_EMPTY_ARRAY);
rocksDb.hashIndexCf().put(key2, BYTE_EMPTY_ARRAY);
rocksDb.stop();
rocksDb = createDb();
assertThat(rocksDb.hashIndexIds(2), contains(1));
assertThat(rocksDb.hashIndexIds(4), contains(3));
assertThat(rocksDb.hashIndexIds(5), is(empty()));
}
@RepeatedTest(10)
void testConcurrentSortedIndexReadAndCreate() {
byte[] fooName = sortedIndexCfName(List.of(
new StorageSortedIndexColumnDescriptor("a", NativeTypes.INT64, true, true)
));
rocksDb.getOrCreateSortedIndexCf(fooName, 0, 0);
// Start two concurrent tasks: one searches for an existing index, another creates a new index.
var barrier = new CyclicBarrier(2);
CompletableFuture<List<IndexColumnFamily>> getIndexFuture = supplyAsync(() -> {
try {
barrier.await(1, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new CompletionException(e);
}
return rocksDb.sortedIndexes(0);
});
CompletableFuture<Void> createIndexFuture = runAsync(() -> {
try {
barrier.await(1, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new CompletionException(e);
}
rocksDb.getOrCreateSortedIndexCf(fooName, 1, 1);
});
assertThat(getIndexFuture, willCompleteSuccessfully());
assertThat(createIndexFuture, willCompleteSuccessfully());
assertThat(getIndexFuture.join().stream().map(IndexColumnFamily::indexId).collect(toList()), contains(0));
}
@Test
void testRemoveSortedIndex() {
int tableId = 0;
int indexId = 0;
byte[] fooName = sortedIndexCfName(List.of(
new StorageSortedIndexColumnDescriptor("a", NativeTypes.INT64, true, true)
));
ColumnFamily cf = rocksDb.getOrCreateSortedIndexCf(fooName, indexId, tableId);
rocksDb.removeSortedIndex(indexId, cf);
assertThat(rocksDb.sortedIndexes(tableId), is(empty()));
}
@Test
void testTableDestroyRemovesSortedIndexes() {
int tableId = 0;
byte[] fooName = sortedIndexCfName(List.of(
new StorageSortedIndexColumnDescriptor("a", NativeTypes.INT64, true, true)
));
rocksDb.getOrCreateSortedIndexCf(fooName, 0, tableId);
rocksDb.destroyTable(tableId);
assertThat(rocksDb.sortedIndexes(tableId), is(empty()));
}
private boolean cfExists(byte[] cfName) {
try {
// Check Column Family existence by trying to create a new one with the same name.
ColumnFamilyHandle handle = rocksDb.db.createColumnFamily(new ColumnFamilyDescriptor(cfName));
rocksDb.db.destroyColumnFamilyHandle(handle);
return false;
} catch (RocksDBException e) {
return true;
}
}
}