Merge branch 'main' into ignite-17354
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/Storable.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/Storable.java
index 714a5a1..dca9d4e 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/Storable.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/Storable.java
@@ -59,5 +59,5 @@
/**
* Returns I/O for handling this storable.
*/
- IoVersions<? extends AbstractDataPageIo> ioVersions();
+ IoVersions<? extends AbstractDataPageIo<?>> ioVersions();
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
index 260993a..c123437 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
@@ -47,12 +47,6 @@
private volatile long lastAppliedIndex;
- // TODO: IGNITE-17466 Delete it
- private volatile long treeRootPageId;
-
- // TODO: IGNITE-17466 Delete it
- private volatile long reuseListRootPageId;
-
private volatile long versionChainTreeRootPageId;
private volatile long rowVersionFreeListRootPageId;
@@ -73,8 +67,6 @@
*
* @param checkpointId Checkpoint ID.
* @param lastAppliedIndex Last applied index value.
- * @param treeRootPageId Tree root page ID.
- * @param reuseListRootPageId Reuse list root page ID.
* @param versionChainTreeRootPageId Version chain tree root page ID.
* @param rowVersionFreeListRootPageId Row version free list root page ID.
* @param pageCount Count of pages in the partition.
@@ -82,15 +74,11 @@
public PartitionMeta(
@Nullable UUID checkpointId,
long lastAppliedIndex,
- long treeRootPageId,
- long reuseListRootPageId,
long versionChainTreeRootPageId,
long rowVersionFreeListRootPageId,
int pageCount
) {
this.lastAppliedIndex = lastAppliedIndex;
- this.treeRootPageId = treeRootPageId;
- this.reuseListRootPageId = reuseListRootPageId;
this.versionChainTreeRootPageId = versionChainTreeRootPageId;
this.rowVersionFreeListRootPageId = rowVersionFreeListRootPageId;
this.pageCount = pageCount;
@@ -109,8 +97,6 @@
this(
checkpointId,
metaIo.getLastAppliedIndex(pageAddr),
- metaIo.getTreeRootPageId(pageAddr),
- metaIo.getReuseListRootPageId(pageAddr),
metaIo.getVersionChainTreeRootPageId(pageAddr),
metaIo.getRowVersionFreeListRootPageId(pageAddr),
metaIo.getPageCount(pageAddr)
@@ -137,44 +123,6 @@
}
/**
- * Returns tree root page ID.
- */
- public long treeRootPageId() {
- return treeRootPageId;
- }
-
- /**
- * Sets tree root page ID.
- *
- * @param checkpointId Checkpoint ID.
- * @param treeRootPageId Tree root page ID.
- */
- public void treeRootPageId(@Nullable UUID checkpointId, long treeRootPageId) {
- updateSnapshot(checkpointId);
-
- this.treeRootPageId = treeRootPageId;
- }
-
- /**
- * Returns reuse list root page ID.
- */
- public long reuseListRootPageId() {
- return reuseListRootPageId;
- }
-
- /**
- * Sets reuse list root page ID.
- *
- * @param checkpointId Checkpoint ID.
- * @param reuseListRootPageId Reuse list root page ID.
- */
- public void reuseListRootPageId(@Nullable UUID checkpointId, long reuseListRootPageId) {
- updateSnapshot(checkpointId);
-
- this.reuseListRootPageId = reuseListRootPageId;
- }
-
- /**
* Returns version chain tree root page ID.
*/
public long versionChainTreeRootPageId() {
@@ -267,10 +215,6 @@
private final long lastAppliedIndex;
- private final long treeRootPageId;
-
- private final long reuseListRootPageId;
-
private final long versionChainTreeRootPageId;
private final long rowVersionFreeListRootPageId;
@@ -286,8 +230,6 @@
private PartitionMetaSnapshot(@Nullable UUID checkpointId, PartitionMeta partitionMeta) {
this.checkpointId = checkpointId;
this.lastAppliedIndex = partitionMeta.lastAppliedIndex;
- this.treeRootPageId = partitionMeta.treeRootPageId;
- this.reuseListRootPageId = partitionMeta.reuseListRootPageId;
this.versionChainTreeRootPageId = partitionMeta.versionChainTreeRootPageId;
this.rowVersionFreeListRootPageId = partitionMeta.rowVersionFreeListRootPageId;
this.pageCount = partitionMeta.pageCount;
@@ -301,20 +243,6 @@
}
/**
- * Returns tree root page ID.
- */
- public long treeRootPageId() {
- return treeRootPageId;
- }
-
- /**
- * Returns reuse list root page ID.
- */
- public long reuseListRootPageId() {
- return reuseListRootPageId;
- }
-
- /**
* Returns version chain tree root page ID.
*/
public long versionChainTreeRootPageId() {
@@ -343,8 +271,6 @@
*/
void writeTo(PartitionMetaIo metaIo, long pageAddr) {
metaIo.setLastAppliedIndex(pageAddr, lastAppliedIndex);
- metaIo.setTreeRootPageId(pageAddr, treeRootPageId);
- metaIo.setReuseListRootPageId(pageAddr, reuseListRootPageId);
metaIo.setVersionChainTreeRootPageId(pageAddr, versionChainTreeRootPageId);
metaIo.setRowVersionFreeListRootPageId(pageAddr, rowVersionFreeListRootPageId);
metaIo.setPageCount(pageAddr, pageCount);
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
index f7c69a9..d2cf8e3 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
@@ -33,11 +33,7 @@
public class PartitionMetaIo extends PageIo {
private static final int LAST_APPLIED_INDEX_OFF = COMMON_HEADER_END;
- private static final int TREE_ROOT_PAGE_ID_OFF = LAST_APPLIED_INDEX_OFF + Long.BYTES;
-
- private static final int REUSE_LIST_ROOT_PAGE_ID_OFF = TREE_ROOT_PAGE_ID_OFF + Long.BYTES;
-
- private static final int VERSION_CHAIN_TREE_ROOT_PAGE_ID_OFF = REUSE_LIST_ROOT_PAGE_ID_OFF + Long.BYTES;
+ private static final int VERSION_CHAIN_TREE_ROOT_PAGE_ID_OFF = LAST_APPLIED_INDEX_OFF + Long.BYTES;
private static final int ROW_VERSION_FREE_LIST_ROOT_PAGE_ID_OFF = VERSION_CHAIN_TREE_ROOT_PAGE_ID_OFF + Long.BYTES;
@@ -64,8 +60,6 @@
super.initNewPage(pageAddr, pageId, pageSize);
setLastAppliedIndex(pageAddr, 0);
- setTreeRootPageId(pageAddr, 0);
- setReuseListRootPageId(pageAddr, 0);
setVersionChainTreeRootPageId(pageAddr, 0);
setRowVersionFreeListRootPageId(pageAddr, 0);
setPageCount(pageAddr, 0);
@@ -93,52 +87,6 @@
}
/**
- * Sets tree root page ID.
- *
- * @param pageAddr Page address.
- * @param pageId Tree root page ID.
- */
- // TODO: IGNITE-17466 Delete it
- public void setTreeRootPageId(long pageAddr, long pageId) {
- assertPageType(pageAddr);
-
- putLong(pageAddr, TREE_ROOT_PAGE_ID_OFF, pageId);
- }
-
- /**
- * Returns tree root page ID.
- *
- * @param pageAddr Page address.
- */
- // TODO: IGNITE-17466 Delete it
- public long getTreeRootPageId(long pageAddr) {
- return getLong(pageAddr, TREE_ROOT_PAGE_ID_OFF);
- }
-
- /**
- * Sets reuse list root page ID.
- *
- * @param pageAddr Page address.
- * @param pageId Reuse list root page ID.
- */
- // TODO: IGNITE-17466 Delete it
- public void setReuseListRootPageId(long pageAddr, long pageId) {
- assertPageType(pageAddr);
-
- putLong(pageAddr, REUSE_LIST_ROOT_PAGE_ID_OFF, pageId);
- }
-
- /**
- * Returns reuse list root page ID.
- *
- * @param pageAddr Page address.
- */
- // TODO: IGNITE-17466 Delete it
- public long getReuseListRootPageId(long pageAddr) {
- return getLong(pageAddr, REUSE_LIST_ROOT_PAGE_ID_OFF);
- }
-
- /**
* Sets version chain tree root page ID.
*
* @param pageAddr Page address.
@@ -206,8 +154,6 @@
protected void printPage(long addr, int pageSize, IgniteStringBuilder sb) {
sb.app("TablePartitionMeta [").nl()
.app("lastAppliedIndex=").app(getLastAppliedIndex(addr)).nl()
- .app(", treeRootPageId=").appendHex(getTreeRootPageId(addr)).nl()
- .app(", reuseListRootPageId=").appendHex(getReuseListRootPageId(addr)).nl()
.app(", versionChainTreeRootPageId=").appendHex(getVersionChainTreeRootPageId(addr)).nl()
.app(", rowVersionFreeListRootPageId=").appendHex(getRowVersionFreeListRootPageId(addr)).nl()
.app(", pageCount=").app(getPageCount(addr)).nl()
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/freelist/TestDataRow.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/freelist/TestDataRow.java
index 38cf52c..890f2f1 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/freelist/TestDataRow.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/freelist/TestDataRow.java
@@ -72,7 +72,7 @@
/** {@inheritDoc} */
@Override
- public IoVersions<? extends AbstractDataPageIo> ioVersions() {
+ public IoVersions<? extends AbstractDataPageIo<?>> ioVersions() {
return VERSIONS;
}
}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
index cd77545..79a6db2 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
@@ -100,16 +100,12 @@
PartitionMeta meta = manager.readOrCreateMeta(null, partId, filePageStore);
assertEquals(0, meta.lastAppliedIndex());
- assertEquals(0, meta.treeRootPageId());
- assertEquals(0, meta.reuseListRootPageId());
assertEquals(0, meta.versionChainTreeRootPageId());
assertEquals(0, meta.rowVersionFreeListRootPageId());
assertEquals(1, meta.pageCount());
// Change the meta and write it to the file.
meta.lastAppliedIndex(null, 50);
- meta.treeRootPageId(null, 100);
- meta.reuseListRootPageId(null, 500);
meta.versionChainTreeRootPageId(null, 300);
meta.rowVersionFreeListRootPageId(null, 900);
meta.incrementPageCount(null);
@@ -128,8 +124,6 @@
PartitionMeta meta = manager.readOrCreateMeta(null, partId, filePageStore);
assertEquals(50, meta.lastAppliedIndex());
- assertEquals(100, meta.treeRootPageId());
- assertEquals(500, meta.reuseListRootPageId());
assertEquals(300, meta.versionChainTreeRootPageId());
assertEquals(900, meta.rowVersionFreeListRootPageId());
assertEquals(2, meta.pageCount());
@@ -139,7 +133,7 @@
try (FilePageStore filePageStore = createFilePageStore(testFilePath)) {
manager.writeMetaToBuffer(
partId,
- new PartitionMeta(UUID.randomUUID(), 100, 200, 1000, 300, 900, 4).metaSnapshot(null),
+ new PartitionMeta(UUID.randomUUID(), 100, 300, 900, 4).metaSnapshot(null),
buffer.rewind()
);
@@ -155,8 +149,6 @@
PartitionMeta meta = manager.readOrCreateMeta(null, partId, filePageStore);
assertEquals(100, meta.lastAppliedIndex());
- assertEquals(200, meta.treeRootPageId());
- assertEquals(1000, meta.reuseListRootPageId());
assertEquals(300, meta.versionChainTreeRootPageId());
assertEquals(900, meta.rowVersionFreeListRootPageId());
assertEquals(4, meta.pageCount());
@@ -174,8 +166,6 @@
PartitionMeta meta = manager.readOrCreateMeta(null, partId, filePageStore);
assertEquals(0, meta.lastAppliedIndex());
- assertEquals(0, meta.treeRootPageId());
- assertEquals(0, meta.reuseListRootPageId());
assertEquals(0, meta.versionChainTreeRootPageId());
assertEquals(0, meta.rowVersionFreeListRootPageId());
assertEquals(1, meta.pageCount());
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
index 9579b1f..cf203d6 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
@@ -51,36 +51,6 @@
}
@Test
- void testTreeRootPageId() {
- PartitionMeta meta = new PartitionMeta();
-
- assertEquals(0, meta.treeRootPageId());
-
- assertDoesNotThrow(() -> meta.treeRootPageId(null, 100));
-
- assertEquals(100, meta.treeRootPageId());
-
- assertDoesNotThrow(() -> meta.treeRootPageId(UUID.randomUUID(), 500));
-
- assertEquals(500, meta.treeRootPageId());
- }
-
- @Test
- void testReuseListRootPageId() {
- PartitionMeta meta = new PartitionMeta();
-
- assertEquals(0, meta.reuseListRootPageId());
-
- assertDoesNotThrow(() -> meta.reuseListRootPageId(null, 100));
-
- assertEquals(100, meta.reuseListRootPageId());
-
- assertDoesNotThrow(() -> meta.reuseListRootPageId(UUID.randomUUID(), 500));
-
- assertEquals(500, meta.reuseListRootPageId());
- }
-
- @Test
void testPageCount() {
PartitionMeta meta = new PartitionMeta();
@@ -129,40 +99,32 @@
void testSnapshot() {
UUID checkpointId = null;
- PartitionMeta meta = new PartitionMeta(checkpointId, 0, 0, 0, 0, 0, 0);
+ PartitionMeta meta = new PartitionMeta(checkpointId, 0, 0, 0, 0);
- checkSnapshot(meta.metaSnapshot(checkpointId), 0, 0, 0, 0, 0, 0);
- checkSnapshot(meta.metaSnapshot(checkpointId = UUID.randomUUID()), 0, 0, 0, 0, 0, 0);
+ checkSnapshot(meta.metaSnapshot(checkpointId), 0, 0, 0, 0);
+ checkSnapshot(meta.metaSnapshot(checkpointId = UUID.randomUUID()), 0, 0, 0, 0);
meta.lastAppliedIndex(checkpointId, 50);
- meta.treeRootPageId(checkpointId, 100);
- meta.reuseListRootPageId(checkpointId, 500);
meta.versionChainTreeRootPageId(checkpointId, 300);
meta.rowVersionFreeListRootPageId(checkpointId, 900);
meta.incrementPageCount(checkpointId);
- checkSnapshot(meta.metaSnapshot(checkpointId), 0, 0, 0, 0, 0, 0);
- checkSnapshot(meta.metaSnapshot(UUID.randomUUID()), 50, 100, 500, 300, 900, 1);
+ checkSnapshot(meta.metaSnapshot(checkpointId), 0, 0, 0, 0);
+ checkSnapshot(meta.metaSnapshot(UUID.randomUUID()), 50, 300, 900, 1);
meta.lastAppliedIndex(checkpointId = UUID.randomUUID(), 51);
- checkSnapshot(meta.metaSnapshot(checkpointId), 50, 100, 500, 300, 900, 1);
-
- meta.treeRootPageId(checkpointId = UUID.randomUUID(), 101);
- checkSnapshot(meta.metaSnapshot(checkpointId), 51, 100, 500, 300, 900, 1);
-
- meta.reuseListRootPageId(checkpointId = UUID.randomUUID(), 505);
- checkSnapshot(meta.metaSnapshot(checkpointId), 51, 101, 500, 300, 900, 1);
+ checkSnapshot(meta.metaSnapshot(checkpointId), 50, 300, 900, 1);
meta.versionChainTreeRootPageId(checkpointId = UUID.randomUUID(), 303);
- checkSnapshot(meta.metaSnapshot(checkpointId), 51, 101, 505, 300, 900, 1);
+ checkSnapshot(meta.metaSnapshot(checkpointId), 51, 300, 900, 1);
meta.rowVersionFreeListRootPageId(checkpointId = UUID.randomUUID(), 909);
- checkSnapshot(meta.metaSnapshot(checkpointId), 51, 101, 505, 303, 900, 1);
+ checkSnapshot(meta.metaSnapshot(checkpointId), 51, 303, 900, 1);
meta.incrementPageCount(checkpointId = UUID.randomUUID());
- checkSnapshot(meta.metaSnapshot(checkpointId), 51, 101, 505, 303, 909, 1);
+ checkSnapshot(meta.metaSnapshot(checkpointId), 51, 303, 909, 1);
- checkSnapshot(meta.metaSnapshot(UUID.randomUUID()), 51, 101, 505, 303, 909, 2);
+ checkSnapshot(meta.metaSnapshot(UUID.randomUUID()), 51, 303, 909, 2);
}
@Test
@@ -177,15 +139,11 @@
private static void checkSnapshot(
PartitionMetaSnapshot snapshot,
long expLastAppliedIndex,
- long expTreeRootPageId,
- long expReuseListPageId,
long expVersionChainTreeRootPageId,
long expRowVersionFreeListRootPageId,
int expPageCount
) {
assertThat(snapshot.lastAppliedIndex(), equalTo(expLastAppliedIndex));
- assertThat(snapshot.treeRootPageId(), equalTo(expTreeRootPageId));
- assertThat(snapshot.reuseListRootPageId(), equalTo(expReuseListPageId));
assertThat(snapshot.versionChainTreeRootPageId(), equalTo(expVersionChainTreeRootPageId));
assertThat(snapshot.rowVersionFreeListRootPageId(), equalTo(expRowVersionFreeListRootPageId));
assertThat(snapshot.pageCount(), equalTo(expPageCount));
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index cc719c7..3c4e6eb 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -355,7 +355,7 @@
partitionMetaManager.addMeta(
new GroupPartitionId(0, 0),
- new PartitionMeta(null, 0, 0, 0, 0, 0, 3)
+ new PartitionMeta(null, 0, 0, 0, 3)
);
FilePageStore filePageStore = mock(FilePageStore.class);
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
index 1eabe26..e88e898 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
@@ -47,7 +47,9 @@
* @deprecated Replaced with {@link MvTableStorage}.
*/
@Deprecated
- TableStorage createTable(TableConfiguration tableCfg) throws StorageException;
+ default TableStorage createTable(TableConfiguration tableCfg) throws StorageException {
+ throw new UnsupportedOperationException();
+ }
/**
* Creates new table storage.
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ConcurrentHashMapStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ConcurrentHashMapStorageTest.java
deleted file mode 100644
index cb438c3..0000000
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ConcurrentHashMapStorageTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage;
-
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.TableConfiguration;
-import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.storage.chm.TestConcurrentHashMapPartitionStorage;
-import org.apache.ignite.internal.storage.chm.TestConcurrentHashMapStorageEngine;
-import org.apache.ignite.internal.storage.chm.schema.TestConcurrentHashMapDataStorageChange;
-import org.apache.ignite.internal.storage.chm.schema.TestConcurrentHashMapDataStorageConfigurationSchema;
-import org.apache.ignite.internal.storage.engine.StorageEngine;
-import org.apache.ignite.internal.storage.engine.TableStorage;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-/**
- * Storage test implementation for {@link TestConcurrentHashMapPartitionStorage}.
- */
-@ExtendWith(ConfigurationExtension.class)
-public class ConcurrentHashMapStorageTest extends AbstractPartitionStorageTest {
- private StorageEngine engine;
-
- private TableStorage table;
-
- @BeforeEach
- public void setUp(
- @InjectConfiguration(
- polymorphicExtensions = {
- HashIndexConfigurationSchema.class,
- UnknownDataStorageConfigurationSchema.class,
- TestConcurrentHashMapDataStorageConfigurationSchema.class,
- ConstantValueDefaultConfigurationSchema.class,
- FunctionCallDefaultConfigurationSchema.class,
- NullValueDefaultConfigurationSchema.class,
- }
- ) TableConfiguration tableCfg
- ) throws Exception {
- engine = new TestConcurrentHashMapStorageEngine();
-
- engine.start();
-
- tableCfg.dataStorage().change(c -> c.convert(TestConcurrentHashMapDataStorageChange.class)).get(1, TimeUnit.SECONDS);
-
- table = engine.createTable(tableCfg);
-
- table.start();
-
- storage = new TestConcurrentHashMapPartitionStorage(0);
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- IgniteUtils.closeAll(
- storage,
- table == null ? null : table::stop,
- engine == null ? null : engine::stop
- );
- }
-}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapPartitionStorage.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapPartitionStorage.java
deleted file mode 100644
index ea5fffd..0000000
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapPartitionStorage.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.chm;
-
-import static java.util.stream.Collectors.toList;
-
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import org.apache.ignite.internal.storage.DataRow;
-import org.apache.ignite.internal.storage.InvokeClosure;
-import org.apache.ignite.internal.storage.PartitionStorage;
-import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.basic.SimpleDataRow;
-import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Storage implementation based on {@link ConcurrentHashMap}.
- */
-public class TestConcurrentHashMapPartitionStorage implements PartitionStorage {
- /** Name of the snapshot file. */
- private static final String SNAPSHOT_FILE = "snapshot_file";
-
- /** Storage content. */
- private final ConcurrentSkipListMap<ByteArray, byte[]> map = new ConcurrentSkipListMap<>();
-
- private final int partId;
-
- /**
- * Constructor.
- *
- * @param partId Partition id.
- */
- public TestConcurrentHashMapPartitionStorage(int partId) {
- assert partId >= 0 : partId;
-
- this.partId = partId;
- }
-
- /** {@inheritDoc} */
- @Override
- public int partitionId() {
- return partId;
- }
-
- /** {@inheritDoc} */
- @Override
- @Nullable
- public DataRow read(SearchRow key) throws StorageException {
- byte[] keyBytes = key.keyBytes();
-
- byte[] valueBytes = map.get(new ByteArray(keyBytes));
-
- return valueBytes == null ? null : new SimpleDataRow(keyBytes, valueBytes);
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<DataRow> readAll(List<? extends SearchRow> keys) {
- return keys.stream()
- .map(this::read)
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
- }
-
- /** {@inheritDoc} */
- @Override
- public void write(DataRow row) throws StorageException {
- map.put(new ByteArray(row.keyBytes()), row.valueBytes());
- }
-
- /** {@inheritDoc} */
- @Override
- public void writeAll(List<? extends DataRow> rows) throws StorageException {
- rows.forEach(this::write);
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<DataRow> insertAll(List<? extends DataRow> rows) throws StorageException {
- return rows.stream()
- .map(row -> map.putIfAbsent(new ByteArray(row.keyBytes()), row.valueBytes()) == null ? null : row)
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
- }
-
- /** {@inheritDoc} */
- @Override
- public void remove(SearchRow key) throws StorageException {
- map.remove(new ByteArray(key.keyBytes()));
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<SearchRow> removeAll(List<? extends SearchRow> keys) {
- var skippedRows = new ArrayList<SearchRow>(keys.size());
-
- for (SearchRow key : keys) {
- byte[] keyBytes = key.keyBytes();
-
- byte[] removedValueBytes = map.remove(new ByteArray(keyBytes));
-
- if (removedValueBytes == null) {
- skippedRows.add(key);
- }
- }
-
- return skippedRows;
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<DataRow> removeAllExact(List<? extends DataRow> keyValues) {
- var skippedRows = new ArrayList<DataRow>(keyValues.size());
-
- for (DataRow row : keyValues) {
- var key = new ByteArray(row.keyBytes());
-
- byte[] existingValueBytes = map.get(key);
-
- if (Arrays.equals(existingValueBytes, row.valueBytes())) {
- map.remove(key);
- } else {
- skippedRows.add(row);
- }
- }
-
- return skippedRows;
- }
-
- /** {@inheritDoc} */
- @Nullable
- @Override
- public <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
- byte[] keyBytes = key.keyBytes();
-
- ByteArray mapKey = new ByteArray(keyBytes);
-
- byte[] existingDataBytes = map.get(mapKey);
-
- clo.call(existingDataBytes == null ? null : new SimpleDataRow(keyBytes, existingDataBytes));
-
- switch (clo.operationType()) {
- case WRITE:
- DataRow newRow = clo.newRow();
-
- assert newRow != null;
-
- map.put(mapKey, newRow.valueBytes());
-
- break;
-
- case REMOVE:
- map.remove(mapKey);
-
- break;
-
- case NOOP:
- break;
-
- default:
- throw new UnsupportedOperationException(String.valueOf(clo.operationType()));
- }
-
- return clo.result();
- }
-
- /** {@inheritDoc} */
- @Override
- public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
- Iterator<SimpleDataRow> iter = map.entrySet().stream()
- .map(e -> new SimpleDataRow(e.getKey().bytes(), e.getValue()))
- .filter(filter)
- .iterator();
-
- return Cursor.fromIterator(iter);
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<Void> snapshot(Path snapshotPath) {
- return CompletableFuture.runAsync(() -> {
- try (
- OutputStream out = Files.newOutputStream(snapshotPath.resolve(SNAPSHOT_FILE));
- ObjectOutputStream objOut = new ObjectOutputStream(out)
- ) {
- objOut.writeObject(map.keySet().stream().map(ByteArray::bytes).collect(toList()));
- objOut.writeObject(new ArrayList<>(map.values()));
- } catch (Exception e) {
- throw new IgniteInternalException(e);
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override
- public void restoreSnapshot(Path snapshotPath) {
- try (
- InputStream in = Files.newInputStream(snapshotPath.resolve(SNAPSHOT_FILE));
- ObjectInputStream objIn = new ObjectInputStream(in)
- ) {
- var keys = (List<byte[]>) objIn.readObject();
- var values = (List<byte[]>) objIn.readObject();
-
- map.clear();
-
- for (int i = 0; i < keys.size(); i++) {
- map.put(new ByteArray(keys.get(i)), values.get(i));
- }
- } catch (Exception e) {
- throw new IgniteInternalException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void close() throws Exception {
- // No-op.
- }
-
- @Override
- public void destroy() {
- map.clear();
- }
-
- @Override
- public long rowsCount() {
- return map.size();
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- TestConcurrentHashMapPartitionStorage that = (TestConcurrentHashMapPartitionStorage) o;
-
- if (!map.equals(that.map)) {
- return false;
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override
- public int hashCode() {
- int hash = 0;
-
- for (Map.Entry<ByteArray, byte[]> entry : map.entrySet()) {
- hash += entry.getKey().hashCode() ^ Arrays.hashCode(entry.getValue());
- }
-
- return hash;
- }
-}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapStorageEngine.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapStorageEngine.java
index 18936e0..04a1228 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapStorageEngine.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapStorageEngine.java
@@ -22,7 +22,6 @@
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageEngine;
-import org.apache.ignite.internal.storage.engine.TableStorage;
/**
* Test implementation of the {@link StorageEngine} based on class {@link ConcurrentHashMap}.
@@ -45,15 +44,9 @@
/** {@inheritDoc} */
@Override
- public TableStorage createTable(TableConfiguration tableCfg) throws StorageException {
+ public MvTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
assert tableCfg.dataStorage().name().value().equals(ENGINE_NAME) : tableCfg.dataStorage().name().value();
- return new TestConcurrentHashMapTableStorage(tableCfg);
- }
-
- /** {@inheritDoc} */
- @Override
- public MvTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
return new TestConcurrentHashMapMvTableStorage(tableCfg);
}
}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapTableStorage.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapTableStorage.java
deleted file mode 100644
index 830b1f9..0000000
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapTableStorage.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.chm;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.configuration.schemas.table.TableConfiguration;
-import org.apache.ignite.internal.storage.PartitionStorage;
-import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.engine.TableStorage;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Test implementation of the {@link TableStorage} based on class {@link ConcurrentHashMap}.
- */
-public class TestConcurrentHashMapTableStorage implements TableStorage {
- private final TableConfiguration tableConfig;
-
- private final Map<Integer, TestConcurrentHashMapPartitionStorage> partitions = new ConcurrentHashMap<>();
-
- private volatile boolean started;
-
- /**
- * Constructor.
- *
- * @param tableConfig Table configuration.
- */
- public TestConcurrentHashMapTableStorage(TableConfiguration tableConfig) {
- this.tableConfig = tableConfig;
- }
-
- /** {@inheritDoc} */
- @Override
- public PartitionStorage getOrCreatePartition(int partId) throws StorageException {
- assert partId >= 0 : partId;
- assert started;
-
- return partitions.computeIfAbsent(partId, TestConcurrentHashMapPartitionStorage::new);
- }
-
- /** {@inheritDoc} */
- @Override
- public @Nullable PartitionStorage getPartition(int partId) {
- assert partId >= 0 : partId;
- assert started;
-
- return partitions.get(partId);
- }
-
- /** {@inheritDoc} */
- @Override
- public void dropPartition(int partId) throws StorageException {
- PartitionStorage partitionStorage = getPartition(partId);
-
- if (partitionStorage != null) {
- partitionStorage.destroy();
- }
- }
-
- @Override
- public boolean isVolatile() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override
- public TableConfiguration configuration() {
- return tableConfig;
- }
-
- /** {@inheritDoc} */
- @Override
- public void start() throws StorageException {
- started = true;
- }
-
- /** {@inheritDoc} */
- @Override
- public void stop() throws StorageException {
- destroy();
- }
-
- /** {@inheritDoc} */
- @Override
- public void destroy() throws StorageException {
- started = false;
-
- partitions.values().forEach(TestConcurrentHashMapPartitionStorage::destroy);
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryPartitionStorage.java
deleted file mode 100644
index 5cf3be5..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryPartitionStorage.java
+++ /dev/null
@@ -1,487 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
-
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Predicate;
-import org.apache.ignite.internal.pagememory.tree.BplusTree;
-import org.apache.ignite.internal.pagememory.tree.IgniteTree;
-import org.apache.ignite.internal.storage.DataRow;
-import org.apache.ignite.internal.storage.InvokeClosure;
-import org.apache.ignite.internal.storage.OperationType;
-import org.apache.ignite.internal.storage.PartitionStorage;
-import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.StorageUtils;
-import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.internal.util.IgniteCursor;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Abstract implementation of {@link PartitionStorage} based on a {@link BplusTree}.
- */
-// TODO: IGNITE-16644 Support snapshots.
-abstract class AbstractPageMemoryPartitionStorage implements PartitionStorage {
- protected final int partId;
-
- protected final TableTree tree;
-
- protected final TableFreeList freeList;
-
- /**
- * Constructor.
- *
- * @param partId Partition id.
- * @param freeList Table free list.
- * @param tree Table tree.
- */
- public AbstractPageMemoryPartitionStorage(
- int partId,
- TableFreeList freeList,
- TableTree tree
- ) {
- assert partId >= 0 && partId < MAX_PARTITION_ID : partId;
-
- this.partId = partId;
- this.freeList = freeList;
- this.tree = tree;
- }
-
- /** {@inheritDoc} */
- @Override
- public int partitionId() {
- return partId;
- }
-
- /** {@inheritDoc} */
- @Override
- public @Nullable DataRow read(SearchRow key) throws StorageException {
- try {
- return wrap(tree.findOne(wrap(key)));
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error reading row", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<DataRow> readAll(List<? extends SearchRow> keys) throws StorageException {
- Collection<DataRow> res = new ArrayList<>(keys.size());
-
- try {
- for (SearchRow key : keys) {
- res.add(wrap(tree.findOne(wrap(key))));
- }
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error reading rows", e);
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override
- public void write(DataRow row) throws StorageException {
- try {
- TableDataRow dataRow = wrap(row);
-
- freeList.insertDataRow(dataRow);
-
- tree.put(dataRow);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error writing row", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void writeAll(List<? extends DataRow> rows) throws StorageException {
- try {
- for (DataRow row : rows) {
- TableDataRow dataRow = wrap(row);
-
- freeList.insertDataRow(dataRow);
-
- tree.put(dataRow);
- }
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error writing rows", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<DataRow> insertAll(List<? extends DataRow> rows) throws StorageException {
- Collection<DataRow> cantInsert = new ArrayList<>();
-
- try {
- InsertClosure insertClosure = new InsertClosure(freeList);
-
- for (DataRow row : rows) {
- TableDataRow dataRow = wrap(row);
-
- insertClosure.reset();
-
- insertClosure.newRow = dataRow;
-
- tree.invoke(dataRow, null, insertClosure);
-
- if (insertClosure.oldRow != null) {
- cantInsert.add(row);
- }
- }
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error inserting rows", e);
- }
-
- return cantInsert;
- }
-
- /** {@inheritDoc} */
- @Override
- public void remove(SearchRow key) throws StorageException {
- try {
- TableSearchRow searchRow = wrap(key);
-
- TableDataRow removed = tree.remove(searchRow);
-
- if (removed != null) {
- freeList.removeDataRowByLink(removed.link());
- }
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error removing row", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<SearchRow> removeAll(List<? extends SearchRow> keys) throws StorageException {
- Collection<SearchRow> skippedRows = new ArrayList<>();
-
- try {
- for (SearchRow key : keys) {
- TableDataRow removed = tree.remove(wrap(key));
-
- if (removed != null) {
- freeList.removeDataRowByLink(removed.link());
- } else {
- skippedRows.add(key);
- }
- }
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error removing rows", e);
- }
-
- return skippedRows;
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<DataRow> removeAllExact(List<? extends DataRow> keyValues) throws StorageException {
- Collection<DataRow> skipped = new ArrayList<>();
-
- try {
- RemoveExactClosure removeExactClosure = new RemoveExactClosure();
-
- for (DataRow keyValue : keyValues) {
- TableDataRow dataRow = wrap(keyValue);
-
- removeExactClosure.reset();
-
- removeExactClosure.forRemoveRow = dataRow;
-
- tree.invoke(dataRow, null, removeExactClosure);
-
- if (removeExactClosure.foundRow == null) {
- skipped.add(keyValue);
- } else {
- freeList.removeDataRowByLink(removeExactClosure.foundRow.link());
- }
- }
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error while removing exact rows", e);
- }
-
- return skipped;
- }
-
- /** {@inheritDoc} */
- @Override
- public <T> @Nullable T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
- IgniteTree.InvokeClosure<TableDataRow> treeClosure = new IgniteTree.InvokeClosure<>() {
- /** {@inheritDoc} */
- @Override
- public void call(@Nullable TableDataRow oldRow) {
- clo.call(wrap(oldRow));
- }
-
- /** {@inheritDoc} */
- @Override
- public @Nullable TableDataRow newRow() {
- DataRow newRow = clo.newRow();
-
- if (newRow == null) {
- return null;
- }
-
- TableDataRow dataRow = wrap(newRow);
-
- try {
- freeList.insertDataRow(dataRow);
- } catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalException(e);
- }
-
- return dataRow;
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteTree.OperationType operationType() {
- OperationType operationType = clo.operationType();
-
- switch (operationType) {
- case WRITE:
- return IgniteTree.OperationType.PUT;
-
- case REMOVE:
- return IgniteTree.OperationType.REMOVE;
-
- case NOOP:
- return IgniteTree.OperationType.NOOP;
-
- default:
- throw new UnsupportedOperationException(String.valueOf(clo.operationType()));
- }
- }
- };
-
- try {
- tree.invoke(wrap(key), null, treeClosure);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error invoking a closure for a row", e);
- }
-
- return clo.result();
- }
-
- /** {@inheritDoc} */
- @Override
- public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
- try {
- IgniteCursor<TableDataRow> treeCursor = tree.find(null, null);
-
- return new Cursor<DataRow>() {
- @Nullable TableDataRow cur = advance();
-
- /** {@inheritDoc} */
- @Override
- public void close() {
- }
-
- /** {@inheritDoc} */
- @Override
- public Iterator<DataRow> iterator() {
- return this;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean hasNext() {
- return cur != null;
- }
-
- /** {@inheritDoc} */
- @Override
- public DataRow next() {
- DataRow next = wrap(cur);
-
- if (next == null) {
- throw new NoSuchElementException();
- }
-
- try {
- cur = advance();
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error getting next row", e);
- }
-
- return next;
- }
-
- @Nullable TableDataRow advance() throws IgniteInternalCheckedException {
- while (treeCursor.next()) {
- TableDataRow dataRow = treeCursor.get();
-
- if (filter.test(wrap(dataRow))) {
- return dataRow;
- }
- }
-
- return null;
- }
- };
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error while scanning rows", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<Void> snapshot(Path snapshotPath) {
- throw new UnsupportedOperationException("Snapshots are not supported yet.");
- }
-
- /** {@inheritDoc} */
- @Override
- public void restoreSnapshot(Path snapshotPath) {
- throw new UnsupportedOperationException("Snapshots are not supported yet.");
- }
-
- /** {@inheritDoc} */
- @Override
- public void destroy() throws StorageException {
- try {
- tree.destroy();
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error while destroying data", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public long rowsCount() {
- try {
- return tree.size();
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error occurred while fetching the size.", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void close() {
- tree.close();
- }
-
- private static TableSearchRow wrap(SearchRow searchRow) {
- ByteBuffer key = searchRow.key();
-
- return new TableSearchRow(StorageUtils.hashCode(key), key);
- }
-
- private static TableDataRow wrap(DataRow dataRow) {
- ByteBuffer key = dataRow.key();
- ByteBuffer value = dataRow.value();
-
- return new TableDataRow(StorageUtils.hashCode(key), key, value);
- }
-
- private static @Nullable DataRow wrap(TableDataRow tableDataRow) {
- return tableDataRow == null ? null : new TableDataRowAdapter(tableDataRow);
- }
-
- private static class InsertClosure implements IgniteTree.InvokeClosure<TableDataRow> {
- final TableFreeList freeList;
-
- TableDataRow newRow;
-
- @Nullable TableDataRow oldRow;
-
- InsertClosure(TableFreeList freeList) {
- this.freeList = freeList;
- }
-
- /** {@inheritDoc} */
- @Override
- public void call(@Nullable TableDataRow oldRow) {
- this.oldRow = oldRow;
- }
-
- /** {@inheritDoc} */
- @Override
- public @Nullable TableDataRow newRow() {
- assert newRow != null;
-
- try {
- freeList.insertDataRow(newRow);
- } catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalException(e);
- }
-
- return newRow;
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteTree.OperationType operationType() {
- return oldRow == null ? IgniteTree.OperationType.PUT : IgniteTree.OperationType.NOOP;
- }
-
- void reset() {
- newRow = null;
-
- oldRow = null;
- }
- }
-
- private static class RemoveExactClosure implements IgniteTree.InvokeClosure<TableDataRow> {
- TableDataRow forRemoveRow;
-
- @Nullable TableDataRow foundRow;
-
- /** {@inheritDoc} */
- @Override
- public void call(@Nullable TableDataRow oldRow) {
- assert forRemoveRow != null;
-
- if (oldRow != null && oldRow.value().equals(forRemoveRow.value())) {
- foundRow = oldRow;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public @Nullable TableDataRow newRow() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override
- public IgniteTree.OperationType operationType() {
- return foundRow == null ? IgniteTree.OperationType.NOOP : IgniteTree.OperationType.REMOVE;
- }
-
- void reset() {
- forRemoveRow = null;
-
- foundRow = null;
- }
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 6210ba4..68d8351 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -27,10 +27,8 @@
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.engine.TableStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage;
import org.apache.ignite.internal.tostring.S;
@@ -41,7 +39,7 @@
* Abstract table storage implementation based on {@link PageMemory}.
*/
// TODO: IGNITE-16642 Support indexes.
-public abstract class AbstractPageMemoryTableStorage implements TableStorage, MvTableStorage {
+public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
protected final TableConfiguration tableCfg;
/** List of objects to be closed on the {@link #stop}. */
@@ -49,9 +47,7 @@
protected volatile boolean started;
- protected volatile AtomicReferenceArray<PartitionStorage> partitions;
-
- protected volatile AtomicReferenceArray<MvPartitionStorage> mvPartitions;
+ protected volatile AtomicReferenceArray<AbstractPageMemoryMvPartitionStorage> mvPartitions;
/**
* Constructor.
@@ -73,8 +69,6 @@
public void start() throws StorageException {
TableView tableView = tableCfg.value();
- partitions = new AtomicReferenceArray<>(tableView.partitions());
-
mvPartitions = new AtomicReferenceArray<>(tableView.partitions());
started = true;
@@ -86,61 +80,6 @@
close(false);
}
- /** {@inheritDoc} */
- @Override
- public PartitionStorage getOrCreatePartition(int partId) throws StorageException {
- PartitionStorage partition = getPartition(partId);
-
- if (partition != null) {
- return partition;
- }
-
- partition = createPartitionStorage(partId);
-
- partitions.set(partId, partition);
-
- return partition;
- }
-
- /** {@inheritDoc} */
- @Override
- public @Nullable PartitionStorage getPartition(int partId) {
- assert started : "Storage has not started yet";
-
- if (partId < 0 || partId >= partitions.length()) {
- throw new IllegalArgumentException(S.toString(
- "Unable to access partition with id outside of configured range",
- "table", tableCfg.name().value(), false,
- "partitionId", partId, false,
- "partitions", partitions.length(), false
- ));
- }
-
- return partitions.get(partId);
- }
-
- /** {@inheritDoc} */
- @Override
- public void dropPartition(int partId) throws StorageException {
- assert started : "Storage has not started yet";
-
- PartitionStorage partition = getPartition(partId);
-
- if (partition != null) {
- partitions.set(partId, null);
-
- partition.destroy();
- }
- }
-
- /**
- * Returns a new instance of {@link AbstractPageMemoryPartitionStorage}.
- *
- * @param partitionId Partition id.
- * @throws StorageException If there is an error while creating the partition storage.
- */
- protected abstract AbstractPageMemoryPartitionStorage createPartitionStorage(int partitionId) throws StorageException;
-
/**
* Returns a new instance of {@link AbstractPageMemoryMvPartitionStorage}.
*
@@ -152,7 +91,7 @@
/** {@inheritDoc} */
@Override
public MvPartitionStorage getOrCreateMvPartition(int partitionId) throws StorageException {
- MvPartitionStorage partition = getMvPartition(partitionId);
+ AbstractPageMemoryMvPartitionStorage partition = getMvPartition(partitionId);
if (partition != null) {
return partition;
@@ -167,7 +106,7 @@
/** {@inheritDoc} */
@Override
- public MvPartitionStorage getMvPartition(int partitionId) {
+ public AbstractPageMemoryMvPartitionStorage getMvPartition(int partitionId) {
assert started : "Storage has not started yet";
if (partitionId < 0 || partitionId >= mvPartitions.length()) {
@@ -200,24 +139,26 @@
return CompletableFuture.completedFuture(null);
}
+ /** {@inheritDoc} */
@Override
public void createIndex(String indexName) {
throw new UnsupportedOperationException("Not implemented yet");
}
+ /** {@inheritDoc} */
@Override
- @Nullable
- public SortedIndexStorage getSortedIndex(int partitionId, String indexName) {
+ public @Nullable SortedIndexStorage getSortedIndex(int partitionId, String indexName) {
throw new UnsupportedOperationException("Not implemented yet");
}
+ /** {@inheritDoc} */
@Override
public CompletableFuture<Void> destroyIndex(String indexName) {
throw new UnsupportedOperationException("Not implemented yet");
}
/**
- * Closes all {@link #partitions} and {@link #autoCloseables}.
+ * Closes all {@link #mvPartitions} and {@link #autoCloseables}.
*
* @param destroy Destroy partitions.
* @throws StorageException If failed.
@@ -227,8 +168,8 @@
List<AutoCloseable> autoCloseables = new ArrayList<>(this.autoCloseables);
- for (int i = 0; i < partitions.length(); i++) {
- PartitionStorage partition = partitions.getAndUpdate(i, p -> null);
+ for (int i = 0; i < mvPartitions.length(); i++) {
+ AbstractPageMemoryMvPartitionStorage partition = mvPartitions.getAndUpdate(i, p -> null);
if (partition != null) {
autoCloseables.add(destroy ? partition::destroy : partition);
@@ -244,6 +185,5 @@
}
this.autoCloseables.clear();
- partitions = null;
}
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/FragmentedByteArray.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/FragmentedByteArray.java
deleted file mode 100644
index bee429b..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/FragmentedByteArray.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import java.nio.ByteBuffer;
-
-/**
- * Helper class for reading an array of bytes in fragments.
- *
- * <p>Structure: array length(int) + byte array(array length).
- */
-class FragmentedByteArray {
- private int arrLen = -1;
-
- private byte[] arr = null;
-
- private int off;
-
- /**
- * Reads data from the buffer.
- *
- * @param buf Byte buffer from which to read.
- */
- void readData(ByteBuffer buf) {
- if (buf.remaining() == 0) {
- return;
- }
-
- if (arrLen == -1) {
- if (buf.remaining() >= 4) {
- arrLen = buf.getInt();
- } else {
- if (arr == null) {
- arr = new byte[4];
- }
-
- int len = Math.min(buf.remaining(), 4 - off);
-
- buf.get(arr, off, len);
- off += len;
-
- if (off == 4) {
- ByteBuffer tmpBuf = ByteBuffer.wrap(arr);
-
- tmpBuf.order(buf.order());
-
- arrLen = tmpBuf.getInt();
- arr = null;
- off = 0;
- }
- }
- }
-
- if (arrLen != -1) {
- if (arr == null) {
- arr = new byte[arrLen];
- }
-
- int len = Math.min(buf.remaining(), arrLen - off);
-
- buf.get(arr, off, len);
- off += len;
- }
- }
-
- /**
- * Returns true if the array has been read completely.
- */
- boolean ready() {
- return arrLen != -1 && off == arrLen;
- }
-
- /**
- * Returns byte array.
- */
- byte[] array() {
- return arr;
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageIoModule.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageIoModule.java
deleted file mode 100644
index bc6ddd8..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageIoModule.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import java.util.Collection;
-import java.util.List;
-import org.apache.ignite.internal.pagememory.io.IoVersions;
-import org.apache.ignite.internal.pagememory.io.PageIoModule;
-import org.apache.ignite.internal.storage.pagememory.io.TableDataIo;
-import org.apache.ignite.internal.storage.pagememory.io.TableInnerIo;
-import org.apache.ignite.internal.storage.pagememory.io.TableLeafIo;
-import org.apache.ignite.internal.storage.pagememory.io.TableMetaIo;
-
-/**
- * {@link PageIoModule} implementation in storage-page-memory module.
- */
-public class PageMemoryStorageIoModule implements PageIoModule {
- /** {@inheritDoc} */
- @Override
- public Collection<IoVersions<?>> ioVersions() {
- return List.of(
- TableMetaIo.VERSIONS,
- TableInnerIo.VERSIONS,
- TableLeafIo.VERSIONS,
- TableDataIo.VERSIONS
- );
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryPartitionStorage.java
deleted file mode 100644
index 4f7d7d5..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryPartitionStorage.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import java.util.Collection;
-import java.util.List;
-import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTimeoutLock;
-import org.apache.ignite.internal.pagememory.tree.BplusTree;
-import org.apache.ignite.internal.storage.DataRow;
-import org.apache.ignite.internal.storage.InvokeClosure;
-import org.apache.ignite.internal.storage.PartitionStorage;
-import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.util.IgniteCursor;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Implementation of {@link PartitionStorage} based on a {@link BplusTree} for persistent case.
- */
-public class PersistentPageMemoryPartitionStorage extends AbstractPageMemoryPartitionStorage {
- private final CheckpointTimeoutLock checkpointTimeoutLock;
-
- /**
- * Constructor.
- *
- * @param partId Partition id.
- * @param freeList Table free list.
- * @param tree Table tree.
- * @param checkpointTimeoutLock Checkpoint timeout lock.
- */
- public PersistentPageMemoryPartitionStorage(
- int partId,
- TableFreeList freeList,
- TableTree tree,
- CheckpointTimeoutLock checkpointTimeoutLock
- ) {
- super(partId, freeList, tree);
-
- this.checkpointTimeoutLock = checkpointTimeoutLock;
- }
-
- /** {@inheritDoc} */
- @Override
- public void write(DataRow row) throws StorageException {
- checkpointTimeoutLock.checkpointReadLock();
-
- try {
- super.write(row);
- } finally {
- checkpointTimeoutLock.checkpointReadUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void writeAll(List<? extends DataRow> rows) throws StorageException {
- checkpointTimeoutLock.checkpointReadLock();
-
- try {
- super.writeAll(rows);
- } finally {
- checkpointTimeoutLock.checkpointReadUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<DataRow> insertAll(List<? extends DataRow> rows) throws StorageException {
- checkpointTimeoutLock.checkpointReadLock();
-
- try {
- return super.insertAll(rows);
- } finally {
- checkpointTimeoutLock.checkpointReadUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void remove(SearchRow key) throws StorageException {
- checkpointTimeoutLock.checkpointReadLock();
-
- try {
- super.remove(key);
- } finally {
- checkpointTimeoutLock.checkpointReadUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<SearchRow> removeAll(List<? extends SearchRow> keys) throws StorageException {
- checkpointTimeoutLock.checkpointReadLock();
-
- try {
- return super.removeAll(keys);
- } finally {
- checkpointTimeoutLock.checkpointReadUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<DataRow> removeAllExact(List<? extends DataRow> keyValues) throws StorageException {
- checkpointTimeoutLock.checkpointReadLock();
-
- try {
- return super.removeAllExact(keyValues);
- } finally {
- checkpointTimeoutLock.checkpointReadUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public <T> @Nullable T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
- checkpointTimeoutLock.checkpointReadLock();
-
- try {
- return super.invoke(key, clo);
- } finally {
- checkpointTimeoutLock.checkpointReadUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void destroy() throws StorageException {
- checkpointTimeoutLock.checkpointReadLock();
-
- try {
- // TODO: IGNITE-17132 Fix partition destruction
-
- IgniteCursor<TableDataRow> cursor = tree.find(null, null);
-
- while (cursor.next()) {
- TableDataRow row = cursor.get();
-
- if (tree.removex(row)) {
- freeList.removeDataRowByLink(row.link());
- }
- }
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error destroy partition: " + partId, e);
- } finally {
- checkpointTimeoutLock.checkpointReadUnlock();
- }
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index 0eafd23..0f07eb6 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -42,7 +42,6 @@
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryDataStorageView;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
@@ -188,7 +187,7 @@
/** {@inheritDoc} */
@Override
- public PersistentPageMemoryTableStorage createTable(TableConfiguration tableCfg) throws StorageException {
+ public PersistentPageMemoryTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
TableView tableView = tableCfg.value();
assert tableView.dataStorage().name().equals(ENGINE_NAME) : tableView.dataStorage().name();
@@ -198,12 +197,6 @@
return new PersistentPageMemoryTableStorage(this, tableCfg, regions.get(dataStorageView.dataRegion()));
}
- /** {@inheritDoc} */
- @Override
- public MvTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
- return createTable(tableCfg);
- }
-
/**
* Returns checkpoint manager, {@code null} if engine not started.
*/
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 10f5100..55de1b0 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -104,79 +104,6 @@
/** {@inheritDoc} */
@Override
- protected PersistentPageMemoryPartitionStorage createPartitionStorage(int partitionId) throws StorageException {
- TableView tableView = tableCfg.value();
-
- FilePageStore filePageStore = ensurePartitionFilePageStore(tableView, partitionId);
-
- CheckpointManager checkpointManager = dataRegion.checkpointManager();
-
- CheckpointTimeoutLock checkpointTimeoutLock = checkpointManager.checkpointTimeoutLock();
-
- checkpointTimeoutLock.checkpointReadLock();
-
- try {
- PersistentPageMemory persistentPageMemory = dataRegion.pageMemory();
-
- int grpId = tableView.tableId();
-
- CheckpointProgress lastCheckpointProgress = checkpointManager.lastCheckpointProgress();
-
- UUID checkpointId = lastCheckpointProgress == null ? null : lastCheckpointProgress.id();
-
- PartitionMeta meta = dataRegion.partitionMetaManager().readOrCreateMeta(
- checkpointId,
- new GroupPartitionId(grpId, partitionId),
- filePageStore
- );
-
- dataRegion.partitionMetaManager().addMeta(new GroupPartitionId(grpId, partitionId), meta);
-
- filePageStore.pages(meta.pageCount());
-
- filePageStore.setPageAllocationListener(pageIdx -> {
- assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
-
- CheckpointProgress last = checkpointManager.lastCheckpointProgress();
-
- meta.incrementPageCount(last == null ? null : last.id());
- });
-
- boolean initNewTree = false;
-
- if (meta.treeRootPageId() == 0) {
- meta.treeRootPageId(checkpointId, persistentPageMemory.allocatePage(grpId, partitionId, FLAG_AUX));
-
- initNewTree = true;
- }
-
- boolean initNewReuseList = false;
-
- if (meta.reuseListRootPageId() == 0) {
- meta.reuseListRootPageId(checkpointId, persistentPageMemory.allocatePage(grpId, partitionId, FLAG_AUX));
-
- initNewReuseList = true;
- }
-
- TableFreeList tableFreeList = createTableFreeList(tableView, partitionId, meta.reuseListRootPageId(), initNewReuseList);
-
- autoCloseables.add(tableFreeList::close);
-
- TableTree tableTree = createTableTree(tableView, partitionId, tableFreeList, meta.treeRootPageId(), initNewTree);
-
- return new PersistentPageMemoryPartitionStorage(partitionId, tableFreeList, tableTree, checkpointTimeoutLock);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException(
- String.format("Error getting or creating partition [tableName=%s, partitionId=%s]", tableView.name(), partitionId),
- e
- );
- } finally {
- checkpointTimeoutLock.checkpointReadUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override
public void destroy() throws StorageException {
close(true);
}
@@ -298,46 +225,10 @@
}
/**
- * Returns new {@link TableFreeList} instance for partition.
- *
- * @param tableView Table configuration.
- * @param partId Partition ID.
- * @param rootPageId Root page ID.
- * @param initNew {@code True} if new metadata should be initialized.
- * @throws StorageException If failed.
- */
- private TableFreeList createTableFreeList(
- TableView tableView,
- int partId,
- long rootPageId,
- boolean initNew
- ) throws StorageException {
- try {
- return new TableFreeList(
- tableView.tableId(),
- partId,
- dataRegion.pageMemory(),
- PageLockListenerNoOp.INSTANCE,
- rootPageId,
- initNew,
- null,
- PageEvictionTrackerNoOp.INSTANCE,
- IoStatisticsHolderNoOp.INSTANCE
- );
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException(
- String.format("Error creating TableFreeList [tableName=%s, partitionId=%s]", tableView.name(), partId),
- e
- );
- }
- }
-
- /**
* Returns new {@link RowVersionFreeList} instance for partition.
*
* @param tableView Table configuration.
* @param partId Partition ID.
- * @param reuseList Reuse list.
* @param rootPageId Root page ID.
* @param initNew {@code True} if new metadata should be initialized.
* @throws StorageException If failed.
@@ -374,45 +265,6 @@
*
* @param tableView Table configuration.
* @param partId Partition ID.
- * @param freeList Table free list.
- * @param rootPageId Root page ID.
- * @param initNewTree {@code True} if new tree should be created.
- * @throws StorageException If failed.
- */
- private TableTree createTableTree(
- TableView tableView,
- int partId,
- TableFreeList freeList,
- long rootPageId,
- boolean initNewTree
- ) throws StorageException {
- int grpId = tableView.tableId();
-
- try {
- return new TableTree(
- grpId,
- tableView.name(),
- partId,
- dataRegion.pageMemory(),
- PageLockListenerNoOp.INSTANCE,
- new AtomicLong(),
- rootPageId,
- freeList,
- initNewTree
- );
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException(
- String.format("Error creating TableTree [tableName=%s, partitionId=%s]", tableView.name(), partId),
- e
- );
- }
- }
-
- /**
- * Returns new {@link TableTree} instance for partition.
- *
- * @param tableView Table configuration.
- * @param partId Partition ID.
* @param freeList {@link VersionChain} free list.
* @param rootPageId Root page ID.
* @param initNewTree {@code True} if new tree should be created.
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRow.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRow.java
deleted file mode 100644
index 18b6dc6..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRow.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.pagememory.Storable;
-import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
-import org.apache.ignite.internal.pagememory.io.IoVersions;
-import org.apache.ignite.internal.storage.DataRow;
-import org.apache.ignite.internal.storage.pagememory.io.TableDataIo;
-
-/**
- * {@link DataRow} implementation.
- */
-public class TableDataRow extends TableSearchRow implements Storable {
- private long link;
-
- private final ByteBuffer value;
-
- /**
- * Constructor.
- *
- * @param link Row link.
- * @param hash Row hash.
- * @param key Key byte buffer.
- * @param value Value byte buffer.
- */
- public TableDataRow(long link, int hash, ByteBuffer key, ByteBuffer value) {
- super(hash, key);
-
- assert !value.isReadOnly();
- assert value.position() == 0;
-
- this.link = link;
-
- this.value = value;
- }
-
- /**
- * Constructor.
- *
- * @param hash Row hash.
- * @param key Key byte buffer.
- * @param value Value byte buffer.
- */
- public TableDataRow(int hash, ByteBuffer key, ByteBuffer value) {
- this(0, hash, key, value);
- }
-
- /** {@inheritDoc} */
- @Override
- public void link(long link) {
- this.link = link;
- }
-
- /** {@inheritDoc} */
- @Override
- public long link() {
- return link;
- }
-
- /** {@inheritDoc} */
- @Override
- public int partition() {
- return partitionId(pageId(link));
- }
-
- /** {@inheritDoc} */
- @Override
- public int size() {
- return 4 + key.limit() + 4 + value.limit();
- }
-
- /** {@inheritDoc} */
- @Override
- public int headerSize() {
- // Key size (int).
- return 4;
- }
-
- /** {@inheritDoc} */
- @Override
- public IoVersions<? extends AbstractDataPageIo> ioVersions() {
- return TableDataIo.VERSIONS;
- }
-
- /**
- * Returns value object as a byte buffer.
- */
- public ByteBuffer value() {
- return value.rewind();
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRowAdapter.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRowAdapter.java
deleted file mode 100644
index 3fba62e..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRowAdapter.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import static org.apache.ignite.internal.storage.StorageUtils.toByteArray;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.storage.DataRow;
-
-/**
- * Delegating implementation of {@link DataRow}.
- */
-class TableDataRowAdapter implements DataRow {
- private final TableDataRow tableDataRow;
-
- /**
- * Constructor.
- *
- * @param tableDataRow Table data row.
- */
- TableDataRowAdapter(TableDataRow tableDataRow) {
- this.tableDataRow = tableDataRow;
- }
-
- /** {@inheritDoc} */
- @Override
- public byte[] valueBytes() {
- return toByteArray(value());
- }
-
- /** {@inheritDoc} */
- @Override
- public ByteBuffer value() {
- return tableDataRow.value();
- }
-
- /** {@inheritDoc} */
- @Override
- public byte[] keyBytes() {
- return toByteArray(key());
- }
-
- /** {@inheritDoc} */
- @Override
- public ByteBuffer key() {
- return tableDataRow.key();
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableFreeList.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableFreeList.java
deleted file mode 100644
index 03c9c40..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableFreeList.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.pagememory.PageMemory;
-import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker;
-import org.apache.ignite.internal.pagememory.freelist.AbstractFreeList;
-import org.apache.ignite.internal.pagememory.freelist.FreeList;
-import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
-import org.apache.ignite.internal.pagememory.util.PageLockListener;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * {@link FreeList} implementation for storage-page-memory module.
- */
-public class TableFreeList extends AbstractFreeList<TableDataRow> {
- private static final IgniteLogger LOG = Loggers.forClass(TableFreeList.class);
-
- private final IoStatisticsHolder statHolder;
-
- /**
- * Constructor.
- *
- * @param grpId Group ID.
- * @param pageMem Page memory.
- * @param lockLsnr Page lock listener.
- * @param metaPageId Metadata page ID.
- * @param initNew {@code True} if new metadata should be initialized.
- * @param pageListCacheLimit Page list cache limit.
- * @param evictionTracker Page eviction tracker.
- * @param statHolder Statistics holder to track IO operations.
- * @throws IgniteInternalCheckedException If failed.
- */
- public TableFreeList(
- int grpId,
- int partId,
- PageMemory pageMem,
- PageLockListener lockLsnr,
- long metaPageId,
- boolean initNew,
- @Nullable AtomicLong pageListCacheLimit,
- PageEvictionTracker evictionTracker,
- IoStatisticsHolder statHolder
- ) throws IgniteInternalCheckedException {
- super(
- grpId,
- partId,
- "TableFreeList_" + grpId,
- pageMem,
- null,
- lockLsnr,
- LOG,
- metaPageId,
- initNew,
- pageListCacheLimit,
- evictionTracker
- );
-
- this.statHolder = statHolder;
- }
-
- /**
- * Inserts a row.
- *
- * @param row Row.
- * @throws IgniteInternalCheckedException If failed.
- */
- public void insertDataRow(TableDataRow row) throws IgniteInternalCheckedException {
- super.insertDataRow(row, statHolder);
- }
-
- /**
- * Removes a row by link.
- *
- * @param link Row link.
- * @throws IgniteInternalCheckedException If failed.
- */
- public void removeDataRowByLink(long link) throws IgniteInternalCheckedException {
- super.removeDataRowByLink(link, statHolder);
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableSearchRow.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableSearchRow.java
deleted file mode 100644
index 17bc257..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableSearchRow.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.storage.SearchRow;
-
-/**
- * {@link SearchRow} implementation.
- */
-public class TableSearchRow {
- protected final int hash;
-
- protected final ByteBuffer key;
-
- /**
- * Constructor.
- *
- * @param hash Key hash.
- * @param key Key byte buffer.
- */
- public TableSearchRow(int hash, ByteBuffer key) {
- assert !key.isReadOnly();
- assert key.position() == 0;
-
- this.hash = hash;
- this.key = key;
- }
-
- /**
- * Returns key object as a byte buffer.
- */
- public ByteBuffer key() {
- return key.rewind();
- }
-
- /**
- * Returns hash of row.
- */
- public int hash() {
- return hash;
- }
-
- /**
- * Returns a row link.
- */
- public long link() {
- return 0;
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java
deleted file mode 100644
index 7bf7f04..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId;
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
-import static org.apache.ignite.internal.pagememory.util.PageUtils.getBytes;
-import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
-import static org.apache.ignite.internal.storage.pagememory.TableTree.RowData.FULL;
-import static org.apache.ignite.internal.storage.pagememory.TableTree.RowData.KEY_ONLY;
-import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
-import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.internal.pagememory.PageMemory;
-import org.apache.ignite.internal.pagememory.io.DataPagePayload;
-import org.apache.ignite.internal.pagememory.reuse.ReuseList;
-import org.apache.ignite.internal.pagememory.tree.BplusTree;
-import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
-import org.apache.ignite.internal.pagememory.util.PageLockListener;
-import org.apache.ignite.internal.storage.pagememory.io.RowIo;
-import org.apache.ignite.internal.storage.pagememory.io.TableDataIo;
-import org.apache.ignite.internal.storage.pagememory.io.TableInnerIo;
-import org.apache.ignite.internal.storage.pagememory.io.TableLeafIo;
-import org.apache.ignite.internal.storage.pagememory.io.TableMetaIo;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * {@link BplusTree} implementation for storage-page-memory module.
- */
-public class TableTree extends BplusTree<TableSearchRow, TableDataRow> {
- /**
- * Constructor.
- *
- * @param grpId Group ID.
- * @param grpName Group name.
- * @param partId Partition ID.
- * @param pageMem Page memory.
- * @param lockLsnr Page lock listener.
- * @param globalRmvId Global remove ID.
- * @param metaPageId Meta page ID.
- * @param reuseList Reuse list.
- * @param initNew {@code True} if new tree should be created.
- */
- public TableTree(
- int grpId,
- String grpName,
- int partId,
- PageMemory pageMem,
- PageLockListener lockLsnr,
- AtomicLong globalRmvId,
- long metaPageId,
- @Nullable ReuseList reuseList,
- boolean initNew
- ) throws IgniteInternalCheckedException {
- super(
- "TableTree_" + grpId,
- grpId,
- grpName,
- partId,
- pageMem,
- lockLsnr,
- globalRmvId,
- metaPageId,
- reuseList
- );
-
- setIos(TableInnerIo.VERSIONS, TableLeafIo.VERSIONS, TableMetaIo.VERSIONS);
-
- initTree(initNew);
- }
-
- /** {@inheritDoc} */
- @Override
- protected int compare(BplusIo<TableSearchRow> io, long pageAddr, int idx, TableSearchRow row) throws IgniteInternalCheckedException {
- RowIo rowIo = (RowIo) io;
-
- int cmp = Integer.compare(rowIo.hash(pageAddr, idx), row.hash());
-
- return cmp != 0 ? cmp : compareRows(rowIo.link(pageAddr, idx), row);
- }
-
- /** {@inheritDoc} */
- @Override
- public TableDataRow getRow(BplusIo<TableSearchRow> io, long pageAddr, int idx, Object x) throws IgniteInternalCheckedException {
- RowIo rowIo = (RowIo) io;
-
- int hash = rowIo.hash(pageAddr, idx);
- long link = rowIo.link(pageAddr, idx);
-
- return getRowByLink(link, hash, FULL);
- }
-
- /**
- * Returns a row by link.
- *
- * @param link Row link.
- * @param hash Row hash.
- * @param rowData Specifies what data to lookup.
- * @throws IgniteInternalCheckedException If failed.
- */
- public TableDataRow getRowByLink(final long link, int hash, RowData rowData) throws IgniteInternalCheckedException {
- assert link != 0;
-
- FragmentedByteArray keyBytes = null;
- FragmentedByteArray valueBytes = null;
-
- long nextLink = link;
-
- do {
- final long pageId = pageId(nextLink);
-
- final long page = pageMem.acquirePage(grpId, pageId, statisticsHolder());
-
- try {
- long pageAddr = pageMem.readLock(grpId, pageId, page);
-
- assert pageAddr != 0L : nextLink;
-
- try {
- TableDataIo dataIo = pageMem.ioRegistry().resolve(pageAddr);
-
- int itemId = itemId(nextLink);
-
- int pageSize = pageMem.realPageSize(grpId);
-
- DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize);
-
- if (!data.hasMoreFragments() && nextLink == link) {
- // Good luck: we can read the row without fragments.
- return readFullRow(link, hash, rowData, pageAddr + data.offset());
- }
-
- ByteBuffer dataBuf = wrapPointer(pageAddr, pageSize);
-
- dataBuf.position(data.offset());
- dataBuf.limit(data.offset() + data.payloadSize());
-
- if (keyBytes == null) {
- keyBytes = new FragmentedByteArray();
- }
-
- keyBytes.readData(dataBuf);
-
- if (keyBytes.ready()) {
- if (rowData == KEY_ONLY) {
- nextLink = 0;
- continue;
- }
-
- if (valueBytes == null) {
- valueBytes = new FragmentedByteArray();
- }
-
- valueBytes.readData(dataBuf);
-
- if (valueBytes.ready()) {
- nextLink = 0;
- continue;
- }
- }
-
- nextLink = data.nextLink();
- } finally {
- pageMem.readUnlock(grpId, pageId, page);
- }
- } finally {
- pageMem.releasePage(grpId, pageId, page);
- }
- } while (nextLink != 0);
-
- ByteBuffer key = ByteBuffer.wrap(keyBytes.array());
- ByteBuffer value = ByteBuffer.wrap(valueBytes == null ? BYTE_EMPTY_ARRAY : valueBytes.array());
-
- return new TableDataRow(link, hash, key, value);
- }
-
- private TableDataRow readFullRow(long link, int hash, RowData rowData, long pageAddr) {
- int off = 0;
-
- int keyBytesLen = getInt(pageAddr, off);
- off += 4;
-
- byte[] keyBytes = getBytes(pageAddr, off, keyBytesLen);
- off += keyBytesLen;
-
- if (rowData == KEY_ONLY) {
- return new TableDataRow(link, hash, ByteBuffer.wrap(keyBytes), ByteBuffer.wrap(BYTE_EMPTY_ARRAY));
- }
-
- int valueBytesLen = getInt(pageAddr, off);
- off += 4;
-
- byte[] valueBytes = getBytes(pageAddr, off, valueBytesLen);
-
- return new TableDataRow(link, hash, ByteBuffer.wrap(keyBytes), ByteBuffer.wrap(valueBytes));
- }
-
- private int compareRows(final long link, TableSearchRow row) throws IgniteInternalCheckedException {
- assert link != 0;
-
- long nextLink = link;
-
- int keyBytesLen = -1;
- int keyBytesOff = 0;
-
- do {
- final long pageId = pageId(nextLink);
-
- final long page = pageMem.acquirePage(grpId, pageId, statisticsHolder());
-
- try {
- final long pageAddr = pageMem.readLock(grpId, pageId, page);
-
- assert pageAddr != 0L : nextLink;
-
- try {
- TableDataIo dataIo = pageMem.ioRegistry().resolve(pageAddr);
-
- int itemId = itemId(nextLink);
-
- int pageSize = pageMem.realPageSize(grpId);
-
- DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize);
-
- if (!data.hasMoreFragments() && nextLink == link) {
- // Good luck: we can compare the rows without fragments.
- return compareRowsFull(pageAddr + data.offset(), row);
- }
-
- ByteBuffer dataBuf = wrapPointer(pageAddr, pageSize);
-
- dataBuf.position(data.offset());
- dataBuf.limit(data.offset() + data.payloadSize());
-
- ByteBuffer keyBuf = row.key();
-
- if (keyBytesLen == -1) {
- // Guaranteed to read because we store it in the header.
- keyBytesLen = dataBuf.getInt();
-
- int cmp = Integer.compare(keyBytesLen, keyBuf.limit());
-
- if (cmp != 0) {
- return cmp;
- }
- }
-
- if (dataBuf.remaining() > 0) {
- int len = Math.min(dataBuf.remaining(), keyBytesLen - keyBytesOff);
-
- int dataBufPos = dataBuf.position();
-
- dataBuf.position(dataBufPos);
- dataBuf.limit(dataBufPos + len);
-
- int oldKeyBufLimit = keyBuf.limit();
-
- keyBuf.position(keyBytesOff);
- keyBuf.limit(keyBytesOff + len);
-
- int cmp = dataBuf.compareTo(keyBuf);
-
- keyBytesOff += len;
-
- keyBuf.limit(oldKeyBufLimit);
-
- if (cmp != 0 || keyBytesOff == keyBytesLen) {
- return cmp;
- }
- }
-
- nextLink = data.nextLink();
- } finally {
- pageMem.readUnlock(grpId, pageId, page);
- }
- } finally {
- pageMem.releasePage(grpId, pageId, page);
- }
- } while (nextLink != 0);
-
- throw new IgniteInternalCheckedException("Row comparison error [link=" + link + ", row=" + row + "]");
- }
-
- private int compareRowsFull(final long pageAddr, TableSearchRow row) {
- int off = 0;
-
- int keyBytesLen = getInt(pageAddr, off);
- off += 4;
-
- ByteBuffer key = row.key();
-
- int cmp = Integer.compare(keyBytesLen, key.limit());
-
- if (cmp != 0) {
- return cmp;
- }
-
- return wrapPointer(pageAddr + off, keyBytesLen).compareTo(key);
- }
-
- /**
- * Row data.
- */
- public enum RowData {
- /** Only {@link TableDataRow#key()} key}. */
- KEY_ONLY,
-
- /** All: {@link TableDataRow#key()} key} and {@link TableDataRow#value()} value}. */
- FULL
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
index 10682c3..26f7fd8 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
@@ -50,8 +50,6 @@
private volatile VolatilePageMemory pageMemory;
- private volatile TableFreeList tableFreeList;
-
private volatile RowVersionFreeList rowVersionFreeList;
/**
@@ -81,12 +79,6 @@
pageMemory.start();
try {
- tableFreeList = createTableFreeList(pageMemory);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error creating a TableFreeList", e);
- }
-
- try {
rowVersionFreeList = createRowVersionFreeList(pageMemory, null);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error creating a RowVersionFreeList", e);
@@ -95,22 +87,6 @@
this.pageMemory = pageMemory;
}
- private TableFreeList createTableFreeList(PageMemory pageMemory) throws IgniteInternalCheckedException {
- long metaPageId = pageMemory.allocatePage(FREE_LIST_GROUP_ID, FREE_LIST_PARTITION_ID, FLAG_AUX);
-
- return new TableFreeList(
- FREE_LIST_GROUP_ID,
- FREE_LIST_PARTITION_ID,
- pageMemory,
- PageLockListenerNoOp.INSTANCE,
- metaPageId,
- true,
- null,
- PageEvictionTrackerNoOp.INSTANCE,
- IoStatisticsHolderNoOp.INSTANCE
- );
- }
-
private static RowVersionFreeList createRowVersionFreeList(
PageMemory pageMemory,
@Nullable ReuseList reuseList
@@ -138,7 +114,6 @@
public void stop() throws Exception {
closeAll(
pageMemory != null ? () -> pageMemory.stop(true) : null,
- tableFreeList != null ? tableFreeList::close : null,
rowVersionFreeList != null ? rowVersionFreeList::close : null
);
}
@@ -152,17 +127,6 @@
}
/**
- * Returns table free list.
- *
- * @throws StorageException If the data region did not start.
- */
- public TableFreeList tableFreeList() {
- checkDataRegionStarted();
-
- return tableFreeList;
- }
-
- /**
* Returns version chain free list.
*
* @throws StorageException If the data region did not start.
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryPartitionStorage.java
deleted file mode 100644
index ae3ad52..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryPartitionStorage.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import org.apache.ignite.internal.pagememory.tree.BplusTree;
-import org.apache.ignite.internal.storage.PartitionStorage;
-
-/**
- * Implementation of {@link PartitionStorage} based on a {@link BplusTree} for in-memory case.
- */
-class VolatilePageMemoryPartitionStorage extends AbstractPageMemoryPartitionStorage {
- /**
- * Constructor.
- *
- * @param partId Partition id.
- * @param freeList Table free list.
- * @param tree Table tree.
- */
- public VolatilePageMemoryPartitionStorage(
- int partId,
- TableFreeList freeList,
- TableTree tree
- ) {
- super(partId, freeList, tree);
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
index 3af642d..c5e2880 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
@@ -92,7 +92,7 @@
/** {@inheritDoc} */
@Override
- public VolatilePageMemoryTableStorage createTable(TableConfiguration tableCfg) {
+ public VolatilePageMemoryTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
TableView tableView = tableCfg.value();
assert tableView.dataStorage().name().equals(ENGINE_NAME) : tableView.dataStorage().name();
@@ -102,12 +102,6 @@
return new VolatilePageMemoryTableStorage(tableCfg, regions.get(dataStorageView.dataRegion()));
}
- /** {@inheritDoc} */
- @Override
- public VolatilePageMemoryTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
- return createTable(tableCfg);
- }
-
/**
* Creates, starts and adds a new data region to the engine.
*
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index 17bd31f..f1f33a4 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -48,18 +48,6 @@
/** {@inheritDoc} */
@Override
- protected VolatilePageMemoryPartitionStorage createPartitionStorage(int partId) throws StorageException {
- TableTree tableTree = createTableTree(partId, tableCfg.value());
-
- return new VolatilePageMemoryPartitionStorage(
- partId,
- dataRegion.tableFreeList(),
- tableTree
- );
- }
-
- /** {@inheritDoc} */
- @Override
public VolatilePageMemoryMvPartitionStorage createMvPartitionStorage(int partitionId) throws StorageException {
VersionChainTree versionChainTree = createVersionChainTree(partitionId, tableCfg.value());
@@ -85,37 +73,7 @@
}
/**
- * Returns new {@link TableTree} instance for partition.
- *
- * @param partId Partition ID.
- * @param tableView Table configuration.
- * @throws StorageException If failed.
- */
- TableTree createTableTree(int partId, TableView tableView) throws StorageException {
- int grpId = tableView.tableId();
-
- try {
- return new TableTree(
- grpId,
- tableView.name(),
- partId,
- dataRegion.pageMemory(),
- PageLockListenerNoOp.INSTANCE,
- new AtomicLong(),
- dataRegion.pageMemory().allocatePage(grpId, partId, FLAG_AUX),
- dataRegion.tableFreeList(),
- true
- );
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException(
- String.format("Error creating TableTree [tableName=%s, partitionId=%s]", tableView.name(), partId),
- e
- );
- }
- }
-
- /**
- * Returns new {@link TableTree} instance for partition.
+ * Returns new {@link VersionChainTree} instance for partition.
*
* @param partId Partition ID.
* @param tableView Table configuration.
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/RowIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/RowIo.java
deleted file mode 100644
index cffff78..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/RowIo.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory.io;
-
-/**
- * Interface for row IO.
- */
-public interface RowIo {
- /**
- * Returns the link for the row in the page by index.
- *
- * @param pageAddr Page address.
- * @param idx Index.
- */
- long link(long pageAddr, int idx);
-
- /**
- * Returns the hash for the row in the page by index.
- *
- * @param pageAddr Page address.
- * @param idx Index.
- */
- int hash(long pageAddr, int idx);
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableDataIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableDataIo.java
deleted file mode 100644
index c82cc77..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableDataIo.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory.io;
-
-import static org.apache.ignite.internal.pagememory.util.PageUtils.putByteBuffer;
-import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
-import static org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
-import org.apache.ignite.internal.pagememory.io.IoVersions;
-import org.apache.ignite.internal.storage.pagememory.TableDataRow;
-import org.apache.ignite.internal.storage.pagememory.TableTree;
-import org.apache.ignite.lang.IgniteStringBuilder;
-
-/**
- * Data pages IO for {@link TableTree}.
- */
-public class TableDataIo extends AbstractDataPageIo<TableDataRow> {
- /** Page IO type. */
- public static final short T_TABLE_DATA_IO = 6;
-
- /** I/O versions. */
- public static final IoVersions<TableDataIo> VERSIONS = new IoVersions<>(new TableDataIo(1));
-
- /**
- * Constructor.
- *
- * @param ver Page format version.
- */
- protected TableDataIo(int ver) {
- super(T_TABLE_DATA_IO, ver);
- }
-
- /** {@inheritDoc} */
- @Override
- protected void writeRowData(long pageAddr, int dataOff, int payloadSize, TableDataRow row, boolean newRow) {
- assertPageType(pageAddr);
-
- long addr = pageAddr + dataOff;
-
- if (newRow) {
- putShort(addr, 0, (short) payloadSize);
- addr += 2;
-
- ByteBuffer key = row.key();
-
- putInt(addr, 0, key.limit());
- addr += 4;
-
- putByteBuffer(addr, 0, key);
- addr += key.limit();
- } else {
- addr += 2 + 4 + row.key().limit();
- }
-
- ByteBuffer value = row.value();
-
- putInt(addr, 0, value.limit());
- addr += 4;
-
- putByteBuffer(addr, 0, value);
- }
-
- /** {@inheritDoc} */
- @Override
- protected void writeFragmentData(TableDataRow row, ByteBuffer buf, int rowOff, int payloadSize) {
- assertPageType(buf);
-
- ByteBuffer key = row.key();
-
- int written = writeFragmentByteBuffer(buf, rowOff, 0, payloadSize, key);
-
- written += writeFragmentByteBuffer(buf, rowOff + written, 4 + key.limit(), payloadSize - written, row.value());
-
- assert written == payloadSize;
- }
-
- /** {@inheritDoc} */
- @Override
- protected void printPage(long addr, int pageSize, IgniteStringBuilder sb) {
- sb.app("TableDataIo [\n");
- printPageLayout(addr, pageSize, sb);
- sb.app("\n]");
- }
-
- private int writeFragmentByteBuffer(
- ByteBuffer bufWriteTo,
- int rowOff,
- int expOff,
- int payloadSize,
- ByteBuffer bufReadFrom
- ) {
- if (payloadSize == 0) {
- // No space left to write.
- return 0;
- }
-
- if (rowOff >= expOff + 4 + bufReadFrom.limit()) {
- // Already fully written to the buffer.
- return 0;
- }
-
- int len = Math.min(payloadSize, expOff + 4 + bufReadFrom.limit() - rowOff);
-
- putValue(bufWriteTo, rowOff - expOff, len, bufReadFrom);
-
- return len;
- }
-
- private void putValue(
- ByteBuffer bufWriteTo,
- int off,
- int len,
- ByteBuffer bufReadFrom
- ) {
- if (off == 0 && len >= 4) {
- bufWriteTo.putInt(bufReadFrom.limit());
-
- len -= 4;
- } else if (off >= 4) {
- off -= 4;
- } else {
- // Partial length write.
- ByteBuffer tmp = ByteBuffer.allocate(4);
-
- tmp.order(bufWriteTo.order());
-
- tmp.putInt(bufReadFrom.limit());
-
- tmp.position(off);
-
- if (len < tmp.capacity()) {
- tmp.limit(off + Math.min(len, tmp.capacity() - off));
- }
-
- bufWriteTo.put(tmp);
-
- if (tmp.limit() < 4) {
- return;
- }
-
- len -= 4 - off;
- off = 0;
- }
-
- int oldBufLimit = bufReadFrom.limit();
-
- bufReadFrom.position(off);
-
- if (len < bufReadFrom.capacity()) {
- bufReadFrom.limit(off + Math.min(len, bufReadFrom.capacity() - off));
- }
-
- bufWriteTo.put(bufReadFrom);
-
- bufReadFrom.limit(oldBufLimit);
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableInnerIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableInnerIo.java
deleted file mode 100644
index 1ad48f0..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableInnerIo.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory.io;
-
-import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
-import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
-import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
-import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
-import static org.apache.ignite.internal.storage.pagememory.TableTree.RowData.KEY_ONLY;
-
-import org.apache.ignite.internal.pagememory.io.IoVersions;
-import org.apache.ignite.internal.pagememory.tree.BplusTree;
-import org.apache.ignite.internal.pagememory.tree.io.BplusInnerIo;
-import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
-import org.apache.ignite.internal.storage.pagememory.TableSearchRow;
-import org.apache.ignite.internal.storage.pagememory.TableTree;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
-
-/**
- * IO routines for {@link TableTree} inner pages.
- *
- * <p>Structure: hash(int) + link(long).
- */
-public class TableInnerIo extends BplusInnerIo<TableSearchRow> implements RowIo {
- private static final int LINK_OFFSET = 4;
-
- /** Page IO type. */
- public static final short T_TABLE_INNER_IO = 4;
-
- /** I/O versions. */
- public static final IoVersions<TableInnerIo> VERSIONS = new IoVersions<>(new TableInnerIo(1));
-
- /**
- * Constructor.
- *
- * @param ver Page format version.
- */
- protected TableInnerIo(int ver) {
- super(
- T_TABLE_INNER_IO,
- ver,
- true,
- Integer.BYTES + Long.BYTES // hash(int) + link(long);
- );
- }
-
- /** {@inheritDoc} */
- @Override
- public void store(long dstPageAddr, int dstIdx, BplusIo<TableSearchRow> srcIo, long srcPageAddr, int srcIdx) {
- assertPageType(dstPageAddr);
-
- int srcHash = hash(srcPageAddr, srcIdx);
- long srcLink = link(srcPageAddr, srcIdx);
-
- int dstOff = offset(dstIdx);
-
- putInt(dstPageAddr, dstOff, srcHash);
- dstOff += LINK_OFFSET;
-
- putLong(dstPageAddr, dstOff, srcLink);
- }
-
- /** {@inheritDoc} */
- @Override
- public void storeByOffset(long pageAddr, int off, TableSearchRow row) {
- assertPageType(pageAddr);
-
- putInt(pageAddr, off, row.hash());
- off += LINK_OFFSET;
-
- putLong(pageAddr, off, row.link());
- }
-
- /** {@inheritDoc} */
- @Override
- public TableSearchRow getLookupRow(BplusTree<TableSearchRow, ?> tree, long pageAddr, int idx) throws IgniteInternalCheckedException {
- int hash = hash(pageAddr, idx);
- long link = link(pageAddr, idx);
-
- return ((TableTree) tree).getRowByLink(link, hash, KEY_ONLY);
- }
-
- /** {@inheritDoc} */
- @Override
- public long link(long pageAddr, int idx) {
- assert idx < getCount(pageAddr) : idx;
-
- return getLong(pageAddr, offset(idx) + LINK_OFFSET);
- }
-
- /** {@inheritDoc} */
- @Override
- public int hash(long pageAddr, int idx) {
- assert idx < getCount(pageAddr) : idx;
-
- return getInt(pageAddr, offset(idx));
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableLeafIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableLeafIo.java
deleted file mode 100644
index 91e5237..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableLeafIo.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory.io;
-
-import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
-import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
-import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
-import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
-import static org.apache.ignite.internal.storage.pagememory.TableTree.RowData.KEY_ONLY;
-
-import org.apache.ignite.internal.pagememory.io.IoVersions;
-import org.apache.ignite.internal.pagememory.tree.BplusTree;
-import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
-import org.apache.ignite.internal.pagememory.tree.io.BplusLeafIo;
-import org.apache.ignite.internal.storage.pagememory.TableSearchRow;
-import org.apache.ignite.internal.storage.pagememory.TableTree;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
-
-/**
- * IO routines for {@link TableTree} leaf pages.
- *
- * <p>Structure: hash(int) + link(long).
- */
-public class TableLeafIo extends BplusLeafIo<TableSearchRow> implements RowIo {
- private static final int LINK_OFFSET = 4;
-
- /** Page IO type. */
- public static final short T_TABLE_LEAF_IO = 5;
-
- /** I/O versions. */
- public static final IoVersions<TableLeafIo> VERSIONS = new IoVersions<>(new TableLeafIo(1));
-
- /**
- * Constructor.
- *
- * @param ver Page format version.
- */
- protected TableLeafIo(int ver) {
- super(
- T_TABLE_LEAF_IO,
- ver,
- Integer.BYTES + Long.BYTES // hash(int) + link(long);
- );
- }
-
- /** {@inheritDoc} */
- @Override
- public void store(long dstPageAddr, int dstIdx, BplusIo<TableSearchRow> srcIo, long srcPageAddr, int srcIdx) {
- assertPageType(dstPageAddr);
-
- int srcHash = hash(srcPageAddr, srcIdx);
- long srcLink = link(srcPageAddr, srcIdx);
-
- int dstOff = offset(dstIdx);
-
- putInt(dstPageAddr, dstOff, srcHash);
- dstOff += LINK_OFFSET;
-
- putLong(dstPageAddr, dstOff, srcLink);
- }
-
- /** {@inheritDoc} */
- @Override
- public void storeByOffset(long pageAddr, int off, TableSearchRow row) {
- assertPageType(pageAddr);
-
- putInt(pageAddr, off, row.hash());
- off += LINK_OFFSET;
-
- putLong(pageAddr, off, row.link());
- }
-
- /** {@inheritDoc} */
- @Override
- public TableSearchRow getLookupRow(BplusTree<TableSearchRow, ?> tree, long pageAddr, int idx) throws IgniteInternalCheckedException {
- int hash = hash(pageAddr, idx);
- long link = link(pageAddr, idx);
-
- return ((TableTree) tree).getRowByLink(link, hash, KEY_ONLY);
- }
-
- /** {@inheritDoc} */
- @Override
- public long link(long pageAddr, int idx) {
- assert idx < getCount(pageAddr) : idx;
-
- return getLong(pageAddr, offset(idx) + LINK_OFFSET);
- }
-
- /** {@inheritDoc} */
- @Override
- public int hash(long pageAddr, int idx) {
- assert idx < getCount(pageAddr) : idx;
-
- return getInt(pageAddr, offset(idx));
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableMetaIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableMetaIo.java
deleted file mode 100644
index 00687a9..0000000
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableMetaIo.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory.io;
-
-import org.apache.ignite.internal.pagememory.io.IoVersions;
-import org.apache.ignite.internal.pagememory.tree.io.BplusMetaIo;
-import org.apache.ignite.internal.storage.pagememory.TableTree;
-
-/**
- * IO routines for {@link TableTree} meta pages.
- */
-public class TableMetaIo extends BplusMetaIo {
- /** Page IO type. */
- public static final short T_TABLE_META_IO = 3;
-
- /** I/O versions. */
- public static final IoVersions<TableMetaIo> VERSIONS = new IoVersions<>(new TableMetaIo(1));
-
- /**
- * Constructor.
- *
- * @param ver Page format version.
- */
- protected TableMetaIo(int ver) {
- super(T_TABLE_META_IO, ver);
- }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index a358bed..823ec14 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -299,7 +299,6 @@
/** {@inheritDoc} */
@Override
public void commitWrite(RowId rowId, Timestamp timestamp) throws StorageException {
-
VersionChain currentVersionChain = findVersionChain(rowId);
if (currentVersionChain == null || currentVersionChain.transactionId() == null) {
@@ -393,13 +392,26 @@
versionChainTree.close();
}
+ /**
+ * Removes all data from this storage and frees all associated resources.
+ *
+ * @throws StorageException If failed to destroy the data or storage is already stopped.
+ */
+ public void destroy() {
+ // TODO: IGNITE-17132 Implement it
+ }
+
private class ScanCursor implements Cursor<BinaryRow> {
private final IgniteCursor<VersionChain> treeCursor;
+
private final Predicate<BinaryRow> keyFilter;
+
private final @Nullable UUID transactionId;
+
private final @Nullable Timestamp timestamp;
private BinaryRow nextRow = null;
+
private boolean iterationExhausted = false;
public ScanCursor(
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinks.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinks.java
index dab84e7..26bb54a 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinks.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinks.java
@@ -30,8 +30,8 @@
import org.apache.ignite.internal.pagememory.util.PageIdUtils;
/**
- * Handling of <em>partitionless links</em>, that is, page memory links from which partition ID is removed.
- * They are used to spare storage space in cases when we know the partition ID from the context.
+ * Handling of <em>partitionless links</em>, that is, page memory links from which partition ID is removed. They are used to spare storage
+ * space in cases when we know the partition ID from the context.
*
* @see PageIdUtils#link(long, int)
*/
@@ -70,9 +70,9 @@
/**
* Writes a partitionless link to memory: first high 2 bytes, then low 4 bytes.
*
- * @param addr address in memory where to start
- * @param link the link to write
- * @return number of bytes written (equal to {@link #PARTITIONLESS_LINK_SIZE_BYTES})
+ * @param addr Address in memory where to start.
+ * @param link The link to write.
+ * @return Number of bytes written (equal to {@link #PARTITIONLESS_LINK_SIZE_BYTES}).
*/
public static long writePartitionlessLink(long addr, long link) {
putShort(addr, 0, (short) tag(link));
@@ -93,8 +93,4 @@
buffer.putInt(pageIndex(link));
}
-
- private PartitionlessLinks() {
- // prevent instantiation
- }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
index b556985..bb6f7bd 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
@@ -39,8 +39,9 @@
private boolean readingFirstSlot = true;
private long firstFragmentLink;
- @Nullable
- private Timestamp timestamp;
+
+ private @Nullable Timestamp timestamp;
+
private long nextLink;
private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue();
@@ -49,10 +50,12 @@
this.partitionId = partitionId;
}
+ /** {@inheritDoc} */
@Override
public long consumePagePayload(long link, long pageAddr, DataPagePayload payload, Predicate<Timestamp> loadValue) {
if (readingFirstSlot) {
readingFirstSlot = false;
+
return readFullOrInitiateReadFragmented(link, pageAddr, payload, loadValue);
} else {
return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null);
@@ -67,6 +70,7 @@
if (!loadValue.test(timestamp)) {
result = new RowVersion(partitionIdFromLink(link), firstFragmentLink, timestamp, nextLink, null);
+
return STOP_TRAVERSAL;
}
@@ -77,6 +81,7 @@
return PageIdUtils.partitionId(PageIdUtils.pageId(link));
}
+ /** {@inheritDoc} */
@Override
public void finish() {
if (result != null) {
@@ -87,7 +92,9 @@
readRowVersionValue.finish();
byte[] valueBytes = readRowVersionValue.result();
+
ByteBuffer value = ByteBuffer.wrap(valueBytes).order(ByteBufferRow.ORDER);
+
result = new RowVersion(partitionIdFromLink(firstFragmentLink), firstFragmentLink, timestamp, nextLink, value);
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
index b1a3a3e..8419151 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
@@ -23,11 +23,13 @@
* Reads {@link RowVersion#value()} from page-memory.
*/
class ReadRowVersionValue extends ReadPageMemoryRowValue {
+ /** {@inheritDoc} */
@Override
protected int valueSizeOffsetInFirstSlot() {
return RowVersion.VALUE_SIZE_OFFSET;
}
+ /** {@inheritDoc} */
@Override
protected int valueOffsetInFirstSlot() {
return RowVersion.VALUE_OFFSET;
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
index 787a0ad..fc4baf5 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -32,14 +32,10 @@
* Represents row version inside row version chain.
*/
public class RowVersion implements Storable {
- /**
- * A 'timestamp' representing absense of a timestamp.
- */
+ /** A 'timestamp' representing absense of a timestamp. */
public static final Timestamp NULL_TIMESTAMP = new Timestamp(Long.MIN_VALUE, Long.MIN_VALUE);
- /**
- * Represents an absent partitionless link.
- */
+ /** Represents an absent partitionless link. */
public static final long NULL_LINK = 0;
private static final int TIMESTAMP_STORE_SIZE_BYTES = 2 * Long.BYTES;
@@ -52,15 +48,17 @@
public static final int VALUE_OFFSET = VALUE_SIZE_OFFSET + VALUE_SIZE_STORE_SIZE_BYTES;
private final int partitionId;
+
private long link;
- @Nullable
- private final Timestamp timestamp;
+ private final @Nullable Timestamp timestamp;
+
private final long nextLink;
+
private final int valueSize;
+
@IgniteToStringExclude
- @Nullable
- private final ByteBuffer value;
+ private final @Nullable ByteBuffer value;
/**
* Constructor.
@@ -84,8 +82,7 @@
this.value = value;
}
- @Nullable
- public Timestamp timestamp() {
+ public @Nullable Timestamp timestamp() {
return timestamp;
}
@@ -93,14 +90,12 @@
return timestampForStorage(timestamp);
}
- static Timestamp timestampForStorage(Timestamp timestamp) {
+ static Timestamp timestampForStorage(@Nullable Timestamp timestamp) {
return timestamp == null ? NULL_TIMESTAMP : timestamp;
}
/**
* Returns partitionless link of the next version or {@code 0} if this version is the last in the chain (i.e. it's the oldest version).
- *
- * @return partitionless link of the next version or {@code 0} if this version is the last in the chain
*/
public long nextLink() {
return nextLink;
@@ -142,21 +137,25 @@
return timestamp != null;
}
+ /** {@inheritDoc} */
@Override
public final void link(long link) {
this.link = link;
}
+ /** {@inheritDoc} */
@Override
public final long link() {
return link;
}
+ /** {@inheritDoc} */
@Override
public final int partition() {
return partitionId;
}
+ /** {@inheritDoc} */
@Override
public int size() {
assert value != null;
@@ -164,16 +163,19 @@
return TIMESTAMP_STORE_SIZE_BYTES + NEXT_LINK_STORE_SIZE_BYTES + VALUE_SIZE_STORE_SIZE_BYTES + value.limit();
}
+ /** {@inheritDoc} */
@Override
public int headerSize() {
return TIMESTAMP_STORE_SIZE_BYTES + NEXT_LINK_STORE_SIZE_BYTES + VALUE_SIZE_STORE_SIZE_BYTES;
}
+ /** {@inheritDoc} */
@Override
- public IoVersions<? extends AbstractDataPageIo> ioVersions() {
+ public IoVersions<? extends AbstractDataPageIo<?>> ioVersions() {
return RowVersionDataIo.VERSIONS;
}
+ /** {@inheritDoc} */
@Override
public String toString() {
return S.toString(RowVersion.class, this);
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
index 94a6add..83ce3fe 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
@@ -35,11 +35,8 @@
class ScanVersionChainByTimestamp implements PageMemoryTraversal<Timestamp> {
private final int partitionId;
- /**
- * Contains the result when the traversal ends.
- */
- @Nullable
- private ByteBufferRow result;
+ /** Contains the result when the traversal ends. */
+ private @Nullable ByteBufferRow result;
/**
* First it's {@code true} (this means that we traverse first slots of versions of the Version Chain using NextLink);
@@ -53,6 +50,7 @@
this.partitionId = partitionId;
}
+ /** {@inheritDoc} */
@Override
public long consumePagePayload(long link, long pageAddr, DataPagePayload payload, Timestamp timestamp) {
if (lookingForVersion) {
@@ -93,6 +91,7 @@
return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null);
}
+ /** {@inheritDoc} */
@Override
public void finish() {
if (lookingForVersion) {
@@ -112,8 +111,7 @@
}
}
- @Nullable
- ByteBufferRow result() {
+ @Nullable ByteBufferRow result() {
return result;
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
index d722a38..bbe6ba6 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
@@ -31,12 +31,10 @@
/**
* Reads a {@link Timestamp} value from memory.
*
- * @param pageAddr address where page data starts
- * @param offset offset to the timestamp value relative to pageAddr
- * @return the timestamp
+ * @param pageAddr Address where page data starts.
+ * @param offset Offset to the timestamp value relative to pageAddr.
*/
- @Nullable
- static Timestamp readTimestamp(long pageAddr, int offset) {
+ static @Nullable Timestamp readTimestamp(long pageAddr, int offset) {
long nodeId = getLong(pageAddr, offset);
long localTimestamp = getLong(pageAddr, offset + Long.BYTES);
@@ -51,10 +49,10 @@
/**
* Writes a {@link Timestamp} to memory starting at the given address + offset.
*
- * @param addr memory address
- * @param offset offset added to the address
- * @param timestamp the timestamp to write
- * @return number of bytes written
+ * @param addr Memory address.
+ * @param offset Offset added to the address.
+ * @param timestamp The timestamp to write.
+ * @return Number of bytes written.
*/
public static int writeTimestampToMemory(long addr, int offset, @Nullable Timestamp timestamp) {
Timestamp timestampForStorage = RowVersion.timestampForStorage(timestamp);
@@ -68,8 +66,8 @@
/**
* Writes a {@link Timestamp} to a buffer.
*
- * @param buffer buffer to which to write
- * @param timestamp the timestamp to write
+ * @param buffer Buffer to which to write.
+ * @param timestamp The timestamp to write.
*/
public static void writeTimestampToBuffer(ByteBuffer buffer, @Nullable Timestamp timestamp) {
Timestamp timestampForStorage = RowVersion.timestampForStorage(timestamp);
@@ -77,8 +75,4 @@
buffer.putLong(timestampForStorage.getNodeId());
buffer.putLong(timestampForStorage.getTimestamp());
}
-
- private Timestamps() {
- // prevent instantiation
- }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
index 9cc73ec..022c1cf 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
@@ -30,17 +30,12 @@
public class VersionChain extends VersionChainKey {
public static final long NULL_UUID_COMPONENT = 0;
- @Nullable
- private final UUID transactionId;
+ private final @Nullable UUID transactionId;
- /**
- * Link to the most recent version.
- */
+ /** Link to the most recent version. */
private final long headLink;
- /**
- * Link to the newest committed {@link RowVersion} if head is not yet committed, or {@link RowVersion#NULL_LINK} otherwise.
- */
+ /** Link to the newest committed {@link RowVersion} if head is not yet committed, or {@link RowVersion#NULL_LINK} otherwise. */
private final long nextLink;
/**
@@ -56,8 +51,7 @@
/**
* Returns a transaction id, associated with a chain's head, or {@code null} if head is already committed.
*/
- @Nullable
- public UUID transactionId() {
+ public @Nullable UUID transactionId() {
return transactionId;
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index a29064c..dbcbd7c 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -28,9 +28,7 @@
* Implementation of {@link MvPartitionStorage} based on a {@link BplusTree} for in-memory case.
*/
public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPartitionStorage {
- /**
- * Last applied index value.
- */
+ /** Last applied index value. */
private volatile long lastAppliedIndex;
/**
diff --git a/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule b/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule
index 639bb65..017911e 100644
--- a/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule
+++ b/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule
@@ -14,5 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-org.apache.ignite.internal.storage.pagememory.PageMemoryStorageIoModule
org.apache.ignite.internal.storage.pagememory.mv.PageMemoryMvStorageIoModule
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryPartitionStorageTest.java
deleted file mode 100644
index 84b03ef..0000000
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryPartitionStorageTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import java.nio.file.Path;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.TableConfiguration;
-import org.apache.ignite.internal.components.LongJvmPauseDetector;
-import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
-import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
-import org.apache.ignite.internal.storage.AbstractPartitionStorageTest;
-import org.apache.ignite.internal.storage.DataRow;
-import org.apache.ignite.internal.storage.engine.TableStorage;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryDataStorageChange;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryDataStorageConfigurationSchema;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryDataStorageView;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfigurationSchema;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-/**
- * Storage test implementation for {@link PersistentPageMemoryPartitionStorage}.
- */
-@ExtendWith(ConfigurationExtension.class)
-@ExtendWith(WorkDirectoryExtension.class)
-public class PersistentPageMemoryPartitionStorageTest extends AbstractPartitionStorageTest {
- private static PageIoRegistry ioRegistry;
-
- @InjectConfiguration(polymorphicExtensions = UnsafeMemoryAllocatorConfigurationSchema.class)
- PersistentPageMemoryStorageEngineConfiguration engineConfig;
-
- @InjectConfiguration(
- name = "table",
- polymorphicExtensions = {
- HashIndexConfigurationSchema.class,
- UnknownDataStorageConfigurationSchema.class,
- PersistentPageMemoryDataStorageConfigurationSchema.class,
- ConstantValueDefaultConfigurationSchema.class,
- FunctionCallDefaultConfigurationSchema.class,
- NullValueDefaultConfigurationSchema.class,
- }
- )
- private TableConfiguration tableCfg;
-
- private LongJvmPauseDetector longJvmPauseDetector;
-
- private PersistentPageMemoryStorageEngine engine;
-
- private TableStorage table;
-
- @WorkDirectory
- private Path workDir;
-
- @BeforeAll
- static void beforeAll() {
- ioRegistry = new PageIoRegistry();
-
- ioRegistry.loadFromServiceLoader();
- }
-
- @BeforeEach
- void setUp() throws Exception {
- String nodeName = "test-node";
-
- longJvmPauseDetector = new LongJvmPauseDetector(nodeName, Loggers.forClass(LongJvmPauseDetector.class));
-
- longJvmPauseDetector.start();
-
- engine = new PersistentPageMemoryStorageEngine(nodeName, engineConfig, ioRegistry, workDir, longJvmPauseDetector);
-
- engine.start();
-
- tableCfg
- .change(c -> c.changeDataStorage(dsc -> dsc.convert(PersistentPageMemoryDataStorageChange.class)))
- .get(1, TimeUnit.SECONDS);
-
- assertEquals(
- PersistentPageMemoryStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME,
- ((PersistentPageMemoryDataStorageView) tableCfg.dataStorage().value()).dataRegion()
- );
-
- table = engine.createTable(tableCfg);
-
- assertThat(table, is(instanceOf(PersistentPageMemoryTableStorage.class)));
-
- table.start();
-
- storage = table.getOrCreatePartition(0);
-
- assertThat(storage, is(instanceOf(PersistentPageMemoryPartitionStorage.class)));
- }
-
- @AfterEach
- void tearDown() throws Exception {
- IgniteUtils.closeAll(
- storage,
- table == null ? null : table::stop,
- engine == null ? null : engine::stop,
- longJvmPauseDetector == null ? null : longJvmPauseDetector::stop
- );
- }
-
- @AfterAll
- static void afterAll() {
- ioRegistry = null;
- }
-
- /** {@inheritDoc} */
- @Test
- @Override
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-16644")
- public void testSnapshot(@WorkDirectory Path workDir) throws Exception {
- super.testSnapshot(workDir);
- }
-
- @Test
- void testReadAfterRestart() throws Exception {
- List<DataRow> rows = IntStream.range(0, 100)
- .mapToObj(i -> dataRow(KEY + i, VALUE + i))
- .collect(Collectors.toList());
-
- storage.writeAll(rows);
-
- engine
- .checkpointManager()
- .forceCheckpoint("before_stop_engine")
- .futureFor(FINISHED)
- .get(1, TimeUnit.SECONDS);
-
- tearDown();
-
- setUp();
-
- rows.forEach(this::checkHasSameEntry);
- }
-}
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryPartitionStorageTest.java
deleted file mode 100644
index cb7fe70..0000000
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryPartitionStorageTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import static java.util.stream.Collectors.joining;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import java.nio.file.Path;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.TableConfiguration;
-import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
-import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
-import org.apache.ignite.internal.storage.AbstractPartitionStorageTest;
-import org.apache.ignite.internal.storage.DataRow;
-import org.apache.ignite.internal.storage.engine.TableStorage;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryDataStorageChange;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryDataStorageConfigurationSchema;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryDataStorageView;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfigurationSchema;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-/**
- * Storage test implementation for {@link VolatilePageMemoryPartitionStorage}.
- */
-@ExtendWith(ConfigurationExtension.class)
-@ExtendWith(WorkDirectoryExtension.class)
-public class VolatilePageMemoryPartitionStorageTest extends AbstractPartitionStorageTest {
- private static PageIoRegistry ioRegistry;
-
- @InjectConfiguration(polymorphicExtensions = UnsafeMemoryAllocatorConfigurationSchema.class)
- VolatilePageMemoryStorageEngineConfiguration engineConfig;
-
- @InjectConfiguration(
- name = "table",
- polymorphicExtensions = {
- HashIndexConfigurationSchema.class,
- UnknownDataStorageConfigurationSchema.class,
- VolatilePageMemoryDataStorageConfigurationSchema.class,
- ConstantValueDefaultConfigurationSchema.class,
- FunctionCallDefaultConfigurationSchema.class,
- NullValueDefaultConfigurationSchema.class,
- }
- )
- private TableConfiguration tableCfg;
-
- private VolatilePageMemoryStorageEngine engine;
-
- private TableStorage table;
-
- @WorkDirectory
- private Path workDir;
-
- @BeforeAll
- static void beforeAll() {
- ioRegistry = new PageIoRegistry();
-
- ioRegistry.loadFromServiceLoader();
- }
-
- @BeforeEach
- void setUp() throws Exception {
- engine = new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
-
- engine.start();
-
- tableCfg
- .change(c -> c.changeDataStorage(dsc -> dsc.convert(VolatilePageMemoryDataStorageChange.class)))
- .get(1, TimeUnit.SECONDS);
-
- assertEquals(
- VolatilePageMemoryStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME,
- ((VolatilePageMemoryDataStorageView) tableCfg.dataStorage().value()).dataRegion()
- );
-
- table = engine.createTable(tableCfg);
-
- assertThat(table, is(instanceOf(VolatilePageMemoryTableStorage.class)));
-
- table.start();
-
- storage = table.getOrCreatePartition(0);
-
- assertThat(storage, is(instanceOf(VolatilePageMemoryPartitionStorage.class)));
- }
-
- @AfterEach
- void tearDown() throws Exception {
- IgniteUtils.closeAll(
- storage,
- table == null ? null : table::stop,
- engine == null ? null : engine::stop
- );
- }
-
- @AfterAll
- static void afterAll() {
- ioRegistry = null;
- }
-
- /** {@inheritDoc} */
- @Test
- @Override
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-16644")
- public void testSnapshot(@WorkDirectory Path workDir) throws Exception {
- super.testSnapshot(workDir);
- }
-
- /**
- * Checks that fragments are written and read correctly.
- *
- * @throws Exception If failed.
- */
- @Test
- void testFragments() {
- int pageSize = engineConfig.pageSize().value();
-
- DataRow dataRow = dataRow(createRandomString(pageSize), createRandomString(pageSize));
-
- storage.write(dataRow);
-
- DataRow read = storage.read(dataRow);
-
- assertArrayEquals(dataRow.valueBytes(), read.valueBytes());
- }
-
- private String createRandomString(int len) {
- return ThreadLocalRandom.current().ints(len).mapToObj(i -> String.valueOf(Math.abs(i % 10))).collect(joining(""));
- }
-}
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
index 8295f79..98e4869 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
@@ -89,7 +89,7 @@
((PersistentPageMemoryDataStorageView) tableCfg.dataStorage().value()).dataRegion()
);
- table = engine.createTable(tableCfg);
+ table = engine.createMvTable(tableCfg);
table.start();
storage = table.createMvPartitionStorage(PARTITION_ID);
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
index 2a35f85..53002c4 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
@@ -80,7 +80,7 @@
((VolatilePageMemoryDataStorageView) tableCfg.dataStorage().value()).dataRegion()
);
- table = engine.createTable(tableCfg);
+ table = engine.createMvTable(tableCfg);
table.start();
storage = table.createMvPartitionStorage(PARTITION_ID);
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
index 92b1b41..69d13ca 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
@@ -21,6 +21,7 @@
import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_COMPLETED;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -70,9 +71,14 @@
/** {@inheritDoc} */
@Override
public void onFlushCompleted(RocksDB db, FlushJobInfo flushJobInfo) {
+ ExecutorService threadPool = tableStorage.engine().threadPool();
+
if (lastEventType.compareAndSet(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED)) {
- lastFlushProcessed = CompletableFuture.runAsync(this::refreshPersistedIndexes, tableStorage.engine().threadPool());
+ lastFlushProcessed = CompletableFuture.runAsync(this::refreshPersistedIndexes, threadPool);
}
+
+ // Do it for every column family, there's no way to tell in advance which one has the latest sequence number.
+ lastFlushProcessed.whenCompleteAsync((o, throwable) -> tableStorage.completeFutures(flushJobInfo.getLargestSeqno()), threadPool);
}
private void refreshPersistedIndexes() {
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index f9edc5c..3316067 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -27,14 +27,9 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.Map.Entry;
import java.util.NoSuchElementException;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
@@ -146,9 +141,6 @@
/** The value of {@link #lastAppliedIndex} persisted to the device at this moment. */
private volatile long persistedIndex;
- /** Map with flush futures by applied index at the time of the {@link #flush()} call. */
- private final ConcurrentMap<Long, CompletableFuture<Void>> flushFuturesByAppliedIndex = new ConcurrentHashMap<>();
-
/**
* Constructor.
*
@@ -202,13 +194,7 @@
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> flush() {
- CompletableFuture<Void> flushFuture = flushFuturesByAppliedIndex.computeIfAbsent(
- lastAppliedIndex, index -> new CompletableFuture<>()
- );
-
- tableStorage.scheduleFlush();
-
- return flushFuture;
+ return tableStorage.awaitFlush(true);
}
/** {@inheritDoc} */
@@ -241,27 +227,10 @@
* Reads a value of {@link #lastAppliedIndex()} from the storage, avoiding memtable, and sets it as a new value of
* {@link #persistedIndex()}.
*
- * <p/>All futures returned by {@link #flush()} are completed here if they correspond to the value of {@link #persistedIndex()}
- * (if flush was called before data started being flushed to the storage).
- *
* @throws StorageException If failed to read index from the storage.
*/
public void refreshPersistedIndex() throws StorageException {
- long persistedIndex = readLastAppliedIndex(persistedTierReadOpts);
-
- this.persistedIndex = persistedIndex;
-
- Set<Entry<Long, CompletableFuture<Void>>> entries = flushFuturesByAppliedIndex.entrySet();
-
- for (Iterator<Entry<Long, CompletableFuture<Void>>> iterator = entries.iterator(); iterator.hasNext(); ) {
- Entry<Long, CompletableFuture<Void>> entry = iterator.next();
-
- if (persistedIndex >= entry.getKey()) {
- entry.getValue().complete(null);
-
- iterator.remove();
- }
- }
+ persistedIndex = readLastAppliedIndex(persistedTierReadOpts);
}
/**
@@ -792,10 +761,6 @@
/** {@inheritDoc} */
@Override
public void close() throws Exception {
- for (CompletableFuture<Void> future : flushFuturesByAppliedIndex.values()) {
- future.cancel(false);
- }
-
IgniteUtils.closeAll(persistedTierReadOpts, readOpts, writeOpts, upperBound);
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java
deleted file mode 100644
index cf49f95..0000000
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java
+++ /dev/null
@@ -1,500 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.rocksdb;
-
-import static java.util.Collections.nCopies;
-import static org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.range;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.Predicate;
-import org.apache.ignite.internal.rocksdb.ColumnFamily;
-import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
-import org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange;
-import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
-import org.apache.ignite.internal.storage.DataRow;
-import org.apache.ignite.internal.storage.InvokeClosure;
-import org.apache.ignite.internal.storage.PartitionStorage;
-import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.StorageUtils;
-import org.apache.ignite.internal.storage.basic.DelegatingDataRow;
-import org.apache.ignite.internal.storage.basic.SimpleDataRow;
-import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.jetbrains.annotations.Nullable;
-import org.rocksdb.ReadOptions;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.Slice;
-import org.rocksdb.WriteBatch;
-import org.rocksdb.WriteOptions;
-
-/**
- * Storage implementation based on a single RocksDB instance.
- */
-class RocksDbPartitionStorage implements PartitionStorage {
- /**
- * Size of the overhead for all keys in the storage: partition ID (unsigned {@code short}) + key hash ({@code int}).
- */
- private static final int PARTITION_KEY_PREFIX_SIZE = Short.BYTES + Integer.BYTES;
-
- /**
- * Partition ID (should be treated as an unsigned short).
- *
- * <p>Partition IDs are always stored in the big endian order, since they need to be compared lexicographically.
- */
- private final int partId;
-
- /** RocksDb instance. */
- private final RocksDB db;
-
- /** Data column family. */
- private final ColumnFamily data;
-
- /** Snapshot manager. */
- private final RocksSnapshotManager snapshotManager;
-
- /**
- * Lock used to insure thread-safety of the {@link #restoreSnapshot} method.
- */
- private final Object snapshotRestoreLock = new Object();
-
- /**
- * Constructor.
- *
- * @param partId Partition id.
- * @param db Rocks DB instance.
- * @param columnFamily Column family to be used for all storage operations. This class does not own the column family handler
- * as it is shared between multiple storages and will not close it.
- * @param threadPool Thread pool for async operations.
- * @throws StorageException If failed to create RocksDB instance.
- */
- RocksDbPartitionStorage(RocksDB db, ColumnFamily columnFamily, int partId, Executor threadPool) throws StorageException {
- assert partId >= 0 && partId < 0xFFFF : partId;
-
- this.partId = partId;
- this.db = db;
- this.data = columnFamily;
-
- ColumnFamilyRange snapshotRange = range(columnFamily, partitionStartPrefix(), partitionEndPrefix());
-
- this.snapshotManager = new RocksSnapshotManager(db, List.of(snapshotRange), threadPool);
- }
-
- /** {@inheritDoc} */
- @Override
- public int partitionId() {
- return partId;
- }
-
- /** {@inheritDoc} */
- @Override
- @Nullable
- public DataRow read(SearchRow key) throws StorageException {
- try {
- byte[] valueBytes = data.get(partitionKey(key));
-
- return valueBytes == null ? null : new DelegatingDataRow(key, valueBytes);
- } catch (RocksDBException e) {
- throw new StorageException("Failed to read data from the storage", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<DataRow> readAll(List<? extends SearchRow> keys) throws StorageException {
- int resultSize = keys.size();
-
- List<byte[]> values;
-
- try {
- values = db.multiGetAsList(nCopies(resultSize, data.handle()), getKeys(keys));
- } catch (RocksDBException e) {
- throw new StorageException("Failed to read data from the storage", e);
- }
-
- assert resultSize == values.size();
-
- List<DataRow> res = new ArrayList<>(resultSize);
-
- for (int i = 0; i < resultSize; i++) {
- byte[] value = values.get(i);
-
- if (value != null) {
- res.add(new DelegatingDataRow(keys.get(i), value));
- }
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override
- public void write(DataRow row) throws StorageException {
- try {
- byte[] value = row.valueBytes();
-
- assert value != null;
-
- data.put(partitionKey(row), value);
- } catch (RocksDBException e) {
- throw new StorageException("Filed to write data to the storage", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void writeAll(List<? extends DataRow> rows) throws StorageException {
- try (WriteBatch batch = new WriteBatch();
- WriteOptions opts = new WriteOptions()) {
- for (DataRow row : rows) {
- byte[] value = row.valueBytes();
-
- assert value != null;
-
- data.put(batch, partitionKey(row), value);
- }
-
- db.write(opts, batch);
- } catch (RocksDBException e) {
- throw new StorageException("Filed to write data to the storage", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<DataRow> insertAll(List<? extends DataRow> rows) throws StorageException {
- List<DataRow> cantInsert = new ArrayList<>();
-
- try (var batch = new WriteBatch();
- var opts = new WriteOptions()) {
-
- for (DataRow row : rows) {
- byte[] partitionKey = partitionKey(row);
-
- if (data.get(partitionKey) == null) {
- byte[] value = row.valueBytes();
-
- assert value != null;
-
- data.put(batch, partitionKey, value);
- } else {
- cantInsert.add(row);
- }
- }
-
- db.write(opts, batch);
- } catch (RocksDBException e) {
- throw new StorageException("Filed to write data to the storage", e);
- }
-
- return cantInsert;
- }
-
- /** {@inheritDoc} */
- @Override
- public void remove(SearchRow key) throws StorageException {
- try {
- data.delete(partitionKey(key));
- } catch (RocksDBException e) {
- throw new StorageException("Failed to remove data from the storage", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<SearchRow> removeAll(List<? extends SearchRow> keys) {
- List<SearchRow> skippedRows = new ArrayList<>();
-
- try (var batch = new WriteBatch();
- var opts = new WriteOptions()) {
-
- for (SearchRow key : keys) {
- byte[] partitionKey = partitionKey(key);
-
- byte[] value = data.get(partitionKey);
-
- if (value != null) {
- data.delete(batch, partitionKey);
- } else {
- skippedRows.add(key);
- }
- }
-
- db.write(opts, batch);
- } catch (RocksDBException e) {
- throw new StorageException("Failed to remove data from the storage", e);
- }
-
- return skippedRows;
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<DataRow> removeAllExact(List<? extends DataRow> keyValues) {
- List<DataRow> skippedRows = new ArrayList<>();
-
- try (WriteBatch batch = new WriteBatch();
- WriteOptions opts = new WriteOptions()) {
-
- List<byte[]> keys = getKeys(keyValues);
- List<byte[]> values = db.multiGetAsList(nCopies(keys.size(), data.handle()), keys);
-
- assert values.size() == keys.size();
-
- for (int i = 0; i < keys.size(); i++) {
- byte[] key = keys.get(i);
- byte[] expectedValue = keyValues.get(i).valueBytes();
- byte[] value = values.get(i);
-
- if (Arrays.equals(value, expectedValue)) {
- data.delete(batch, key);
- } else {
- skippedRows.add(keyValues.get(i));
- }
- }
-
- db.write(opts, batch);
- } catch (RocksDBException e) {
- throw new StorageException("Failed to remove data from the storage", e);
- }
-
- return skippedRows;
- }
-
- /** {@inheritDoc} */
- @Nullable
- @Override
- public <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
- try {
- byte[] partitionKey = partitionKey(key);
-
- byte[] existingDataBytes = data.get(partitionKey);
-
- clo.call(existingDataBytes == null ? null : new DelegatingDataRow(key, existingDataBytes));
-
- switch (clo.operationType()) {
- case WRITE:
- DataRow newRow = clo.newRow();
-
- assert newRow != null;
-
- byte[] value = newRow.valueBytes();
-
- assert value != null;
-
- data.put(partitionKey, value);
-
- break;
-
- case REMOVE:
- data.delete(partitionKey);
-
- break;
-
- case NOOP:
- break;
-
- default:
- throw new UnsupportedOperationException(String.valueOf(clo.operationType()));
- }
-
- return clo.result();
- } catch (RocksDBException e) {
- throw new StorageException("Failed to access data in the storage", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
- var upperBound = new Slice(partitionEndPrefix());
-
- var options = new ReadOptions().setIterateUpperBound(upperBound);
-
- RocksIterator it = data.newIterator(options);
-
- it.seek(partitionStartPrefix());
-
- return new ScanCursor(it, filter) {
- @Override
- public void close() throws Exception {
- super.close();
-
- IgniteUtils.closeAll(options, upperBound);
- }
- };
- }
-
- // TODO IGNITE-16769 Implement correct PartitionStorage rows count calculation.
- @Override
- public long rowsCount() {
- var upperBound = new Slice(partitionEndPrefix());
-
- var options = new ReadOptions().setIterateUpperBound(upperBound);
-
- RocksIterator it = data.newIterator(options);
-
- it.seek(partitionStartPrefix());
-
- long size = 0;
-
- while (it.isValid()) {
- ++size;
- it.next();
- }
-
- try {
- IgniteUtils.closeAll(options, upperBound);
- } catch (Exception e) {
- throw new StorageException("Error occurred while fetching the size.", e);
- }
-
- return size;
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<Void> snapshot(Path snapshotPath) {
- return snapshotManager.createSnapshot(snapshotPath);
- }
-
- /** {@inheritDoc} */
- @Override
- public void restoreSnapshot(Path path) {
- synchronized (snapshotRestoreLock) {
- destroy();
-
- snapshotManager.restoreSnapshot(path);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void close() throws Exception {
- // nothing to do
- }
-
- @Override
- public void destroy() {
- try {
- data.deleteRange(partitionStartPrefix(), partitionEndPrefix());
- } catch (RocksDBException e) {
- throw new StorageException("Unable to delete partition " + partId, e);
- }
- }
-
- /**
- * Creates a prefix of all keys in the given partition.
- */
- private byte[] partitionStartPrefix() {
- return unsignedShortAsBytes(partId);
- }
-
- /**
- * Creates a prefix of all keys in the next partition, used as an exclusive bound.
- */
- private byte[] partitionEndPrefix() {
- return unsignedShortAsBytes(partId + 1);
- }
-
- private static byte[] unsignedShortAsBytes(int value) {
- byte[] result = new byte[Short.BYTES];
-
- result[0] = (byte) (value >>> 8);
- result[1] = (byte) value;
-
- return result;
- }
-
- /** Cursor wrapper over the RocksIterator object with custom filter. */
- private static class ScanCursor extends RocksIteratorAdapter<DataRow> {
- /** Custom filter predicate. */
- private final Predicate<SearchRow> filter;
-
- /**
- * Constructor.
- *
- * @param iter Iterator.
- * @param filter Filter.
- */
- private ScanCursor(RocksIterator iter, Predicate<SearchRow> filter) {
- super(iter);
-
- this.filter = filter;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean hasNext() {
- while (super.hasNext() && !filter.test(decodeEntry(it.key(), it.value()))) {
- it.next();
- }
-
- return super.hasNext();
- }
-
- @Override
- protected DataRow decodeEntry(byte[] key, byte[] value) {
- byte[] rowKey = Arrays.copyOfRange(key, PARTITION_KEY_PREFIX_SIZE, key.length);
-
- return new SimpleDataRow(rowKey, value);
- }
- }
-
- /**
- * Creates a key used in this partition storage by prepending a partition ID (to distinguish between different partition data)
- * and the key's hash (an optimisation).
- */
- private byte[] partitionKey(SearchRow key) {
- ByteBuffer keyBuffer = key.key();
-
- return ByteBuffer.allocate(PARTITION_KEY_PREFIX_SIZE + keyBuffer.limit())
- .order(ByteOrder.BIG_ENDIAN)
- .putShort((short) partId)
- // TODO: use precomputed hash, see https://issues.apache.org/jira/browse/IGNITE-16370
- .putInt(StorageUtils.hashCode(keyBuffer))
- .put(keyBuffer)
- .array();
- }
-
- /**
- * Gets a list of key byte arrays.
- *
- * @param keyValues Key rows.
- * @return List of keys as byte arrays.
- */
- private List<byte[]> getKeys(List<? extends SearchRow> keyValues) {
- List<byte[]> keys = new ArrayList<>(keyValues.size());
-
- for (SearchRow keyValue : keyValues) {
- keys.add(partitionKey(keyValue));
- }
-
- return keys;
- }
-}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index 05192a7..dbe98a7 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -34,9 +34,7 @@
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageEngine;
-import org.apache.ignite.internal.storage.engine.TableStorage;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionConfiguration;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionView;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
@@ -148,7 +146,7 @@
/** {@inheritDoc} */
@Override
- public TableStorage createTable(TableConfiguration tableCfg) throws StorageException {
+ public RocksDbTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
TableView tableView = tableCfg.value();
assert tableView.dataStorage().name().equals(ENGINE_NAME) : tableView.dataStorage().name();
@@ -167,9 +165,4 @@
return new RocksDbTableStorage(this, tablePath, tableCfg, dataRegion);
}
-
- @Override
- public MvTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
- return (MvTableStorage) createTable(tableCfg);
- }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 5e2d16d..ca6207f 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -27,8 +27,13 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -38,10 +43,8 @@
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.engine.TableStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -60,7 +63,7 @@
/**
* Table storage implementation based on {@link RocksDB} instance.
*/
-class RocksDbTableStorage implements TableStorage, MvTableStorage {
+class RocksDbTableStorage implements MvTableStorage {
/** Logger. */
private static final IgniteLogger LOG = Loggers.forClass(RocksDbTableStorage.class);
@@ -97,6 +100,15 @@
/** Partition storages. */
private volatile AtomicReferenceArray<RocksDbMvPartitionStorage> partitions;
+ /** Map with flush futures by sequence number at the time of the {@link #awaitFlush(boolean)} call. */
+ private final ConcurrentMap<Long, CompletableFuture<Void>> flushFuturesBySequenceNumber = new ConcurrentHashMap<>();
+
+ /** Latest known sequence number for persisted data. Not volatile, protected by explicit synchronization. */
+ private long latestPersistedSequenceNumber;
+
+ /** Mutex for {@link #latestPersistedSequenceNumber} modifications. */
+ private final Object latestPersistedSequenceNumberMux = new Object();
+
/**
* Instance of the latest scheduled flush closure.
*
@@ -104,10 +116,6 @@
*/
private volatile Runnable latestFlushClosure;
- /** Flag indicating if the storage has been stopped. */
- @Deprecated
- private volatile boolean stopped = false;
-
//TODO Use it instead of the "stopped" flag.
/** Busy lock to stop synchronously. */
final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -212,6 +220,11 @@
throw new StorageException("Unidentified column family [name=" + cf.name() + ", table=" + tableCfg.name() + ']');
}
}
+
+ // Pointless synchronization, but without it there would be a warning in the code.
+ synchronized (latestPersistedSequenceNumberMux) {
+ latestPersistedSequenceNumber = db.getLatestSequenceNumber();
+ }
} catch (RocksDBException e) {
throw new StorageException("Failed to initialize RocksDB instance", e);
}
@@ -224,9 +237,62 @@
}
/**
+ * Returns a future to wait next flush operation from the current point in time. Uses {@link RocksDB#getLatestSequenceNumber()} to
+ * achieve this.
+ *
+ * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)} should be explicitly triggerred in the near future.
+ *
+ * @see #scheduleFlush()
+ */
+ public CompletableFuture<Void> awaitFlush(boolean schedule) {
+ CompletableFuture<Void> future;
+
+ long dbSequenceNumber = db.getLatestSequenceNumber();
+
+ synchronized (latestPersistedSequenceNumberMux) {
+ if (dbSequenceNumber <= latestPersistedSequenceNumber) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ future = flushFuturesBySequenceNumber.computeIfAbsent(dbSequenceNumber, l -> new CompletableFuture<>());
+ }
+
+ if (schedule) {
+ scheduleFlush();
+ }
+
+ return future;
+ }
+
+ /**
+ * Completes all futures in {@link #flushFuturesBySequenceNumber} up to a given sequence number.
+ */
+ void completeFutures(long sequenceNumber) {
+ synchronized (latestPersistedSequenceNumberMux) {
+ if (sequenceNumber <= latestPersistedSequenceNumber) {
+ return;
+ }
+
+ latestPersistedSequenceNumber = sequenceNumber;
+ }
+
+ Set<Entry<Long, CompletableFuture<Void>>> entries = flushFuturesBySequenceNumber.entrySet();
+
+ for (Iterator<Entry<Long, CompletableFuture<Void>>> iterator = entries.iterator(); iterator.hasNext(); ) {
+ Entry<Long, CompletableFuture<Void>> entry = iterator.next();
+
+ if (sequenceNumber >= entry.getKey()) {
+ entry.getValue().complete(null);
+
+ iterator.remove();
+ }
+ }
+ }
+
+ /**
* Schedules a flush of the table. If run several times within a small amount of time, only the last scheduled flush will be executed.
*/
- public void scheduleFlush() {
+ void scheduleFlush() {
Runnable newClosure = new Runnable() {
@Override
public void run() {
@@ -235,7 +301,9 @@
}
try {
- db.flush(flushOptions);
+ // Explicit list of CF handles is mandatory!
+ // Default flush is buggy and only invokes listener methods for a single random CF.
+ db.flush(flushOptions, List.of(metaCfHandle(), partitionCfHandle()));
} catch (RocksDBException e) {
LOG.error("Error occurred during the explicit flush for table '{}'", e, tableCfg.name());
}
@@ -253,14 +321,16 @@
/** {@inheritDoc} */
@Override
public void stop() throws StorageException {
- stopped = true;
-
if (!stopGuard.compareAndSet(false, true)) {
return;
}
busyLock.block();
+ for (CompletableFuture<Void> future : flushFuturesBySequenceNumber.values()) {
+ future.cancel(false);
+ }
+
List<AutoCloseable> resources = new ArrayList<>();
resources.add(db);
@@ -296,25 +366,6 @@
/** {@inheritDoc} */
@Override
- public PartitionStorage getOrCreatePartition(int partId) throws StorageException {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Nullable
- @Override
- public PartitionStorage getPartition(int partId) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public void dropPartition(int partId) throws StorageException {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
public RocksDbMvPartitionStorage getOrCreateMvPartition(int partitionId) throws StorageException {
RocksDbMvPartitionStorage partition = getMvPartition(partitionId);
@@ -351,7 +402,7 @@
mvPartition.destroy();
// Wait for the data to actually be removed from the disk and close the storage.
- return mvPartition.flush()
+ return awaitFlush(false)
.whenComplete((v, e) -> {
partitions.set(partitionId, null);
@@ -363,17 +414,19 @@
});
}
+ /** {@inheritDoc} */
@Override
public void createIndex(String indexName) {
throw new UnsupportedOperationException("Not implemented yet");
}
+ /** {@inheritDoc} */
@Override
- @Nullable
- public SortedIndexStorage getSortedIndex(int partitionId, String indexName) {
+ public @Nullable SortedIndexStorage getSortedIndex(int partitionId, String indexName) {
throw new UnsupportedOperationException("Not implemented yet");
}
+ /** {@inheritDoc} */
@Override
public CompletableFuture<Void> destroyIndex(String indexName) {
throw new UnsupportedOperationException("Not implemented yet");
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index b1ec8a7..31b560e 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -44,7 +44,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
/**
- * Storage test implementation for {@link RocksDbPartitionStorage}.
+ * Storage test implementation for {@link RocksDbMvPartitionStorage}.
*/
@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
index ddf3b6c..d42fb15 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
@@ -47,8 +47,6 @@
import org.apache.ignite.internal.storage.BaseMvStoragesTest;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageChange;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
@@ -70,17 +68,19 @@
@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
public class RocksDbTableStorageTest extends BaseMvStoragesTest {
- private StorageEngine engine;
+ private RocksDbStorageEngine engine;
- private MvTableStorage storage;
+ private RocksDbTableStorage storage;
@BeforeEach
public void setUp(
@WorkDirectory Path workDir,
- @InjectConfiguration RocksDbStorageEngineConfiguration rocksDbEngineConfig,
+ @InjectConfiguration(
+ value = "mock {flushDelayMillis = 0, defaultRegion {size = 16536, writeBufferSize = 16536}}"
+ ) RocksDbStorageEngineConfiguration rocksDbEngineConfig,
@InjectConfiguration(
name = "table",
- value = "mock.partitions = 1024",
+ value = "mock.partitions = 512",
polymorphicExtensions = {
HashIndexConfigurationSchema.class,
UnknownDataStorageConfigurationSchema.class,
@@ -97,15 +97,6 @@
assertThat(((RocksDbDataStorageView) tableCfg.dataStorage().value()).dataRegion(), equalTo(DEFAULT_DATA_REGION_NAME));
- CompletableFuture<Void> changeEngineFuture = rocksDbEngineConfig.defaultRegion()
- .change(c -> c.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024));
-
- assertThat(changeEngineFuture, willBe(nullValue(Void.class)));
-
- changeEngineFuture = tableCfg.change(cfg -> cfg.changePartitions(512));
-
- assertThat(changeEngineFuture, willBe(nullValue(Void.class)));
-
engine = new RocksDbStorageEngine(rocksDbEngineConfig, workDir);
engine.start();
@@ -126,7 +117,7 @@
}
/**
- * Tests that {@link RocksDbTableStorage#getPartition} correctly returns an existing partition.
+ * Tests that {@link RocksDbTableStorage#getMvPartition(int)} correctly returns an existing partition.
*/
@Test
void testCreatePartition() {
@@ -205,7 +196,12 @@
RowId rowId1 = partitionStorage1.runConsistently(() -> partitionStorage1.insert(testData, txId));
- assertThat(storage.destroyPartition(42), willCompleteSuccessfully());
+ CompletableFuture<Void> destroyFuture = storage.destroyPartition(42);
+
+ // Partition desctuction doesn't enforce flush.
+ storage.scheduleFlush();
+
+ assertThat(destroyFuture, willCompleteSuccessfully());
assertThat(storage.getMvPartition(42), is(nullValue()));
assertThat(storage.getOrCreateMvPartition(42).read(rowId0, txId), is(nullValue()));
@@ -248,8 +244,7 @@
assertThat(storage.isVolatile(), is(false));
}
- @Nullable
- private static IgniteBiTuple<TestKey, TestValue> unwrap(@Nullable BinaryRow binaryRow) {
+ private static @Nullable IgniteBiTuple<TestKey, TestValue> unwrap(@Nullable BinaryRow binaryRow) {
if (binaryRow == null) {
return null;
}