IGNITE-16392 PageMemory data regions configuration; porting of PageMemoryNoStoreImpl and all other necessary components. (#591)
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/CoreDistributedConfigurationModule.java b/modules/api/src/main/java/org/apache/ignite/configuration/CoreDistributedConfigurationModule.java
index e47b02f..e0639e0 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/CoreDistributedConfigurationModule.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/CoreDistributedConfigurationModule.java
@@ -22,6 +22,9 @@
import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
+import org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.PartialIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
@@ -54,7 +57,10 @@
return List.of(
HashIndexConfigurationSchema.class,
SortedIndexConfigurationSchema.class,
- PartialIndexConfigurationSchema.class
+ PartialIndexConfigurationSchema.class,
+ RocksDbDataRegionConfigurationSchema.class,
+ PageMemoryDataRegionConfigurationSchema.class,
+ UnsafeMemoryAllocatorConfigurationSchema.class
);
}
}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
index f32381f..fe60b92 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
@@ -17,48 +17,24 @@
package org.apache.ignite.configuration.schemas.store;
-import org.apache.ignite.configuration.annotation.Config;
-import org.apache.ignite.configuration.annotation.Value;
+import static org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema.ROCKSDB_DATA_REGION_TYPE;
+
+import org.apache.ignite.configuration.annotation.InjectedName;
+import org.apache.ignite.configuration.annotation.PolymorphicConfig;
+import org.apache.ignite.configuration.annotation.PolymorphicId;
import org.apache.ignite.configuration.validation.Immutable;
-import org.apache.ignite.configuration.validation.Min;
-import org.apache.ignite.configuration.validation.OneOf;
/**
- * Configuration schema for data region. Currently it represents configuration for rocksdb storage engine only.
+ * Configuration schema for data region.
*/
-@Config
+@PolymorphicConfig
public class DataRegionConfigurationSchema {
- /** Type of the RocksDB data region. */
- public static final String ROCKSDB_DATA_REGION_TYPE = "rocksdb";
-
- /** Cache type for the RocksDB LRU cache. */
- public static final String ROCKSDB_LRU_CACHE = "lru";
-
- /** Cache type for the RocksDB LRU cache. */
- public static final String ROCKSDB_CLOCK_CACHE = "clock";
-
/** Type for the future polymorphic configuration schemas. */
@Immutable
- @OneOf(ROCKSDB_DATA_REGION_TYPE)
- @Value(hasDefault = true)
+ @PolymorphicId(hasDefault = true)
public String type = ROCKSDB_DATA_REGION_TYPE;
- /** Size of the rocksdb offheap cache. */
- @Value(hasDefault = true)
- public long size = 256 * 1024 * 1024;
-
- /** Size of rocksdb write buffer. */
- @Value(hasDefault = true)
- @Min(1)
- public long writeBufferSize = 64 * 1024 * 1024;
-
- /** Cache type - only {@code LRU} is supported at the moment. {@code Clock} implementation has known bugs. */
- @OneOf({ROCKSDB_LRU_CACHE})
- @Value(hasDefault = true)
- public String cache = ROCKSDB_LRU_CACHE;
-
- /** The cache is sharded to 2^numShardBits shards, by hash of the key. */
- @Min(-1)
- @Value(hasDefault = true)
- public int numShardBits = -1;
+ /** Name of the data region. */
+ @InjectedName
+ public String name;
}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java
index aff2f1e..a10858a 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java
@@ -20,6 +20,7 @@
import org.apache.ignite.configuration.annotation.ConfigValue;
import org.apache.ignite.configuration.annotation.ConfigurationRoot;
import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Name;
import org.apache.ignite.configuration.annotation.NamedConfigValue;
import org.apache.ignite.configuration.validation.ExceptKeys;
@@ -33,6 +34,7 @@
/** Default data region. */
@ConfigValue
+ @Name(DEFAULT_DATA_REGION_NAME)
public DataRegionConfigurationSchema defaultRegion;
/** Other data regions. */
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/MemoryAllocatorConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/MemoryAllocatorConfigurationSchema.java
new file mode 100644
index 0000000..6ca6a73
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/MemoryAllocatorConfigurationSchema.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import static org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfigurationSchema.UNSAFE_MEMORY_ALLOCATOR_TYPE;
+
+import org.apache.ignite.configuration.annotation.PolymorphicConfig;
+import org.apache.ignite.configuration.annotation.PolymorphicId;
+
+/**
+ * Configuration schema for memory allocation strategies.
+ */
+@PolymorphicConfig
+public class MemoryAllocatorConfigurationSchema {
+ @PolymorphicId(hasDefault = true)
+ public String type = UNSAFE_MEMORY_ALLOCATOR_TYPE;
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/PageMemoryDataRegionConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/PageMemoryDataRegionConfigurationSchema.java
new file mode 100644
index 0000000..eb58218
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/PageMemoryDataRegionConfigurationSchema.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.PAGE_MEMORY_DATA_REGION_TYPE;
+
+import org.apache.ignite.configuration.annotation.ConfigValue;
+import org.apache.ignite.configuration.annotation.PolymorphicConfigInstance;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Immutable;
+import org.apache.ignite.configuration.validation.OneOf;
+
+/**
+ * Data region configuration for Page Memory storage engine.
+ */
+@PolymorphicConfigInstance(PAGE_MEMORY_DATA_REGION_TYPE)
+public class PageMemoryDataRegionConfigurationSchema extends DataRegionConfigurationSchema {
+ /** Type of the Page Memory data region. */
+ public static final String PAGE_MEMORY_DATA_REGION_TYPE = "pagemem";
+
+ /** Default initial size. */
+ public static final long DFLT_DATA_REGION_INITIAL_SIZE = 256 * 1024 * 1024;
+
+ /** Default max size. */
+ public static final long DFLT_DATA_REGION_MAX_SIZE = 256 * 1024 * 1024;
+
+ /** Eviction is disabled. */
+ public static final String DISABLED_EVICTION_MODE = "DISABLED";
+
+ /** Random-LRU algorithm. */
+ public static final String RANDOM_LRU_EVICTION_MODE = "RANDOM_LRU";
+
+ /** Random-2-LRU algorithm: scan-resistant version of Random-LRU. */
+ public static final String RANDOM_2_LRU_EVICTION_MODE = "RANDOM_2_LRU";
+
+ /** Random-LRU algorithm. */
+ public static final String RANDOM_LRU_REPLACEMENT_MODE = "RANDOM_LRU";
+
+ /** Segmented-LRU algorithm. */
+ public static final String SEGMENTED_LRU_REPLACEMENT_MODE = "SEGMENTED_LRU";
+
+ /** CLOCK algorithm. */
+ public static final String CLOCK_REPLACEMENT_MODE = "CLOCK";
+
+ @Immutable
+ @Value(hasDefault = true)
+ public int pageSize = 16 * 1024;
+
+ @Value(hasDefault = true)
+ public boolean persistent = false;
+
+ @Value(hasDefault = true)
+ public long initSize = DFLT_DATA_REGION_INITIAL_SIZE;
+
+ @Value(hasDefault = true)
+ public long maxSize = DFLT_DATA_REGION_MAX_SIZE;
+
+ @ConfigValue
+ public MemoryAllocatorConfigurationSchema memoryAllocator;
+
+ @OneOf({DISABLED_EVICTION_MODE, RANDOM_LRU_EVICTION_MODE, RANDOM_2_LRU_EVICTION_MODE})
+ @Value(hasDefault = true)
+ public String evictionMode = DISABLED_EVICTION_MODE;
+
+ @OneOf({RANDOM_LRU_REPLACEMENT_MODE, SEGMENTED_LRU_REPLACEMENT_MODE, CLOCK_REPLACEMENT_MODE})
+ @Value(hasDefault = true)
+ public String replacementMode = CLOCK_REPLACEMENT_MODE;
+
+ @Value(hasDefault = true)
+ public double evictionThreshold = 0.9;
+
+ @Value(hasDefault = true)
+ public int emptyPagesPoolSize = 100;
+
+ @Value(hasDefault = true)
+ public long checkpointPageBufSize = 0;
+
+ @Value(hasDefault = true)
+ public boolean lazyMemoryAllocation = true;
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/RocksDbDataRegionConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/RocksDbDataRegionConfigurationSchema.java
new file mode 100644
index 0000000..247e935
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/RocksDbDataRegionConfigurationSchema.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import static org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema.ROCKSDB_DATA_REGION_TYPE;
+
+import org.apache.ignite.configuration.annotation.PolymorphicConfigInstance;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Min;
+import org.apache.ignite.configuration.validation.OneOf;
+
+/**
+ * Data region configuration for rocksdb storage engine.
+ */
+@PolymorphicConfigInstance(ROCKSDB_DATA_REGION_TYPE)
+public class RocksDbDataRegionConfigurationSchema extends DataRegionConfigurationSchema {
+ /** Type of the RocksDB data region. */
+ public static final String ROCKSDB_DATA_REGION_TYPE = "rocksdb";
+
+ /** Cache type for the RocksDB LRU cache. */
+ public static final String ROCKSDB_LRU_CACHE = "lru";
+
+ /** Cache type for the RocksDB LRU cache. */
+ public static final String ROCKSDB_CLOCK_CACHE = "clock";
+
+ /** Size of the rocksdb offheap cache. */
+ @Value(hasDefault = true)
+ public long size = 256 * 1024 * 1024;
+
+ /** Size of rocksdb write buffer. */
+ @Value(hasDefault = true)
+ @Min(1)
+ public long writeBufferSize = 64 * 1024 * 1024;
+
+ /** Cache type - only {@code LRU} is supported at the moment. {@code Clock} implementation has known bugs. */
+ @OneOf(ROCKSDB_LRU_CACHE)
+ @Value(hasDefault = true)
+ public String cache = ROCKSDB_LRU_CACHE;
+
+ /** The cache is sharded to 2^numShardBits shards, by hash of the key. */
+ @Min(-1)
+ @Value(hasDefault = true)
+ public int numShardBits = -1;
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/UnsafeMemoryAllocatorConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/UnsafeMemoryAllocatorConfigurationSchema.java
new file mode 100644
index 0000000..d31b813
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/UnsafeMemoryAllocatorConfigurationSchema.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import static org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfigurationSchema.UNSAFE_MEMORY_ALLOCATOR_TYPE;
+
+import org.apache.ignite.configuration.annotation.PolymorphicConfigInstance;
+
+/**
+ * Memory allocator that allocates data in offheap using {@link sun.misc.Unsafe}.
+ */
+@PolymorphicConfigInstance(UNSAFE_MEMORY_ALLOCATOR_TYPE)
+public class UnsafeMemoryAllocatorConfigurationSchema extends MemoryAllocatorConfigurationSchema {
+ public static final String UNSAFE_MEMORY_ALLOCATOR_TYPE = "unsafe";
+}
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationTestUtils.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationTestUtils.java
new file mode 100644
index 0000000..5f73467
--- /dev/null
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationTestUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import org.apache.ignite.configuration.ConfigurationTree;
+
+/**
+ * Utility methods, related to testing configurations.
+ */
+public class ConfigurationTestUtils {
+ /**
+ * Casts dynamic configuration to its specific type. Can be applied to polymorphic configuration instances.
+ */
+ public static <C extends ConfigurationTree> C fixConfiguration(C cfg) {
+ return (C) ((DynamicConfiguration) cfg).specificConfigTree();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 4aa50af..2d37a5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -61,6 +61,12 @@
/** Unsafe. */
private static final Unsafe UNSAFE = unsafe();
+ /** Page size. */
+ private static final int PAGE_SIZE = UNSAFE.pageSize();
+
+ /** Empty page. */
+ private static final byte[] EMPTY_PAGE = new byte[PAGE_SIZE];
+
/** Unaligned flag. */
private static final boolean UNALIGNED = unaligned();
@@ -1258,6 +1264,24 @@
}
/**
+ * Fills memory with zeroes.
+ *
+ * @param addr Address.
+ * @param len Length.
+ */
+ public static void zeroMemory(long addr, long len) {
+ long off = 0;
+
+ for (; off + PAGE_SIZE <= len; off += PAGE_SIZE) {
+ GridUnsafe.copyMemory(EMPTY_PAGE, GridUnsafe.BYTE_ARR_OFF, null, addr + off, PAGE_SIZE);
+ }
+
+ if (len != off) {
+ GridUnsafe.copyMemory(EMPTY_PAGE, GridUnsafe.BYTE_ARR_OFF, null, addr + off, len - off);
+ }
+ }
+
+ /**
* Copy memory between offheap locations.
*
* @param srcAddr Source address.
@@ -1499,7 +1523,7 @@
* @return Page size.
*/
public static int pageSize() {
- return UNSAFE.pageSize();
+ return PAGE_SIZE;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 355c75b..3a9cd8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -285,6 +285,27 @@
}
/**
+ * Returns size in human-readable format.
+ *
+ * @param bytes Number of bytes to display.
+ * @param si If {@code true}, then unit base is 1000, otherwise unit base is 1024.
+ * @return Formatted size.
+ */
+ public static String readableSize(long bytes, boolean si) {
+ int unit = si ? 1000 : 1024;
+
+ if (bytes < unit) {
+ return bytes + " B";
+ }
+
+ int exp = (int) (Math.log(bytes) / Math.log(unit));
+
+ String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp - 1) + (si ? "" : "i");
+
+ return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
+ }
+
+ /**
* Gets absolute value for integer. If integer is {@link Integer#MIN_VALUE}, then {@code 0} is returned.
*
* @param i Integer.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
new file mode 100644
index 0000000..f04f470
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.lang.IgniteSystemProperties;
+
+/**
+ * Lock state structure is as follows.
+ * <pre>
+ * +----------------+---------------+---------+----------+
+ * | WRITE WAIT CNT | READ WAIT CNT | TAG | LOCK CNT |
+ * +----------------+---------------+---------+----------+
+ * | 2 bytes | 2 bytes | 2 bytes | 2 bytes |
+ * +----------------+---------------+---------+----------+
+ * </pre>
+ */
+public class OffheapReadWriteLock {
+ /**
+ * Default empirical value for the spin count.
+ *
+ * @see #IGNITE_OFFHEAP_RWLOCK_SPIN_COUNT
+ */
+ private static final int DFLT_OFFHEAP_RWLOCK_SPIN_COUNT = 32;
+
+ /** A number of spin-lock iterations to take before falling back to the blocking approach. */
+ private static final String IGNITE_OFFHEAP_RWLOCK_SPIN_COUNT = "IGNITE_OFFHEAP_RWLOCK_SPIN_COUNT";
+
+ /** Count of spins before the fallback to {@link ReentrantLock} and {@link Condition}. */
+ private static final int SPIN_CNT = IgniteSystemProperties.getInteger(IGNITE_OFFHEAP_RWLOCK_SPIN_COUNT, DFLT_OFFHEAP_RWLOCK_SPIN_COUNT);
+
+ /** Always lock tag. */
+ public static final int TAG_LOCK_ALWAYS = -1;
+
+ /** Lock size. */
+ public static final int LOCK_SIZE = 8;
+
+ /** Maximum number of waiting threads, read or write. */
+ public static final int MAX_WAITERS = 0xFFFF;
+
+ /** Striped locks array. */
+ private final ReentrantLock[] locks;
+
+ /** Conditions for reads. */
+ private final Condition[] readConditions;
+
+ /** Conditions for write. */
+ private final Condition[] writeConditions;
+
+ /** Mask to extract stripe index from the hash. */
+ private final int monitorsMask;
+
+ /**
+ * Constructor.
+ *
+ * @param concLvl Concurrency level, must be a power of two.
+ */
+ public OffheapReadWriteLock(int concLvl) {
+ if ((concLvl & concLvl - 1) != 0) {
+ throw new IllegalArgumentException("Concurrency level must be a power of 2: " + concLvl);
+ }
+
+ monitorsMask = concLvl - 1;
+
+ locks = new ReentrantLock[concLvl];
+ readConditions = new Condition[concLvl];
+ writeConditions = new Condition[concLvl];
+
+ for (int i = 0; i < locks.length; i++) {
+ ReentrantLock lock = new ReentrantLock();
+
+ locks[i] = lock;
+ readConditions[i] = lock.newCondition();
+ writeConditions[i] = lock.newCondition();
+ }
+ }
+
+ /**
+ * Initializes the lock.
+ *
+ * @param lock Lock pointer to initialize.
+ */
+ public void init(long lock, int tag) {
+ tag &= 0xFFFF;
+
+ assert tag != 0;
+
+ GridUnsafe.putLong(lock, (long) tag << 16);
+ }
+
+ /**
+ * Acquires a read lock.
+ *
+ * @param lock Lock address.
+ */
+ public boolean readLock(long lock, int tag) {
+ long state = GridUnsafe.getLongVolatile(null, lock);
+
+ assert state != 0;
+
+ // Check write waiters first.
+ int writeWaitCnt = writersWaitCount(state);
+
+ if (writeWaitCnt == 0) {
+ for (int i = 0; i < SPIN_CNT; i++) {
+ if (!checkTag(state, tag)) {
+ return false;
+ }
+
+ if (canReadLock(state)) {
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, 1, 0, 0))) {
+ return true;
+ } else {
+ // Retry CAS, do not count as spin cycle.
+ i--;
+ }
+ }
+
+ state = GridUnsafe.getLongVolatile(null, lock);
+ }
+ }
+
+ int idx = lockIndex(lock);
+
+ ReentrantLock lockObj = locks[idx];
+
+ lockObj.lock();
+
+ try {
+ updateReadersWaitCount(lock, lockObj, 1);
+
+ return waitAcquireReadLock(lock, idx, tag);
+ } finally {
+ lockObj.unlock();
+ }
+ }
+
+ /**
+ * Releases read lock.
+ *
+ * @param lock Lock address.
+ */
+ public void readUnlock(long lock) {
+ while (true) {
+ long state = GridUnsafe.getLongVolatile(null, lock);
+
+ if (lockCount(state) <= 0) {
+ throw new IllegalMonitorStateException("Attempted to release a read lock while not holding it "
+ + "[lock=" + IgniteUtils.hexLong(lock) + ", state=" + IgniteUtils.hexLong(state) + ']');
+ }
+
+ long updated = updateState(state, -1, 0, 0);
+
+ assert updated != 0;
+
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+ // Notify monitor if we were CASed to zero and there is a write waiter.
+ if (lockCount(updated) == 0 && writersWaitCount(updated) > 0) {
+ int idx = lockIndex(lock);
+
+ ReentrantLock lockObj = locks[idx];
+
+ lockObj.lock();
+
+ try {
+ // Note that we signal all waiters for this stripe. Since not all waiters in this
+ // stripe/index belong to this particular lock, we can't wake up just one of them.
+ writeConditions[idx].signalAll();
+ } finally {
+ lockObj.unlock();
+ }
+ }
+
+ return;
+ }
+ }
+ }
+
+ /**
+ * Tries to acquire a write lock.
+ *
+ * @param lock Lock address.
+ */
+ public boolean tryWriteLock(long lock, int tag) {
+ long state = GridUnsafe.getLongVolatile(null, lock);
+
+ return checkTag(state, tag) && canWriteLock(state)
+ && GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -1, 0, 0));
+ }
+
+ /**
+ * Acquires a write lock.
+ *
+ * @param lock Lock address.
+ */
+ public boolean writeLock(long lock, int tag) {
+ assert tag != 0;
+
+ for (int i = 0; i < SPIN_CNT; i++) {
+ long state = GridUnsafe.getLongVolatile(null, lock);
+
+ assert state != 0;
+
+ if (!checkTag(state, tag)) {
+ return false;
+ }
+
+ if (canWriteLock(state)) {
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -1, 0, 0))) {
+ return true;
+ } else {
+ // Retry CAS, do not count as spin cycle.
+ i--;
+ }
+ }
+ }
+
+ int idx = lockIndex(lock);
+
+ ReentrantLock lockObj = locks[idx];
+
+ lockObj.lock();
+
+ try {
+ updateWritersWaitCount(lock, lockObj, 1);
+
+ return waitAcquireWriteLock(lock, idx, tag);
+ } finally {
+ lockObj.unlock();
+ }
+ }
+
+ /**
+ * Checks whether write lock is acquired.
+ *
+ * @param lock Lock to check.
+ * @return {@code True} if write lock is held by any thread for the given offheap RW lock.
+ */
+ public boolean isWriteLocked(long lock) {
+ return lockCount(GridUnsafe.getLongVolatile(null, lock)) == -1;
+ }
+
+ /**
+ * Checks whether read lock is acquired.
+ *
+ * @param lock Lock to check.
+ * @return {@code True} if at least one read lock is held by any thread for the given offheap RW lock.
+ */
+ public boolean isReadLocked(long lock) {
+ return lockCount(GridUnsafe.getLongVolatile(null, lock)) > 0;
+ }
+
+ /**
+ * Releases write lock.
+ *
+ * @param lock Lock address.
+ */
+ public void writeUnlock(long lock, int tag) {
+ long updated;
+
+ assert tag != 0;
+
+ while (true) {
+ long state = GridUnsafe.getLongVolatile(null, lock);
+
+ if (lockCount(state) != -1) {
+ throw new IllegalMonitorStateException("Attempted to release write lock while not holding it "
+ + "[lock=" + IgniteUtils.hexLong(lock) + ", state=" + IgniteUtils.hexLong(state) + ']');
+ }
+
+ updated = releaseWithTag(state, tag);
+
+ assert updated != 0;
+
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+ break;
+ }
+ }
+
+ int writeWaitCnt = writersWaitCount(updated);
+ int readWaitCnt = readersWaitCount(updated);
+
+ if (writeWaitCnt > 0 || readWaitCnt > 0) {
+ int idx = lockIndex(lock);
+
+ ReentrantLock lockObj = locks[idx];
+
+ lockObj.lock();
+
+ try {
+ signalNextWaiter(writeWaitCnt, idx);
+ } finally {
+ lockObj.unlock();
+ }
+ }
+ }
+
+ /**
+ * Signals readers or writers depending on a counter value.
+ *
+ * @param writeWaitCnt Writers wait count.
+ * @param idx Lock index.
+ */
+ private void signalNextWaiter(int writeWaitCnt, int idx) {
+ // Note that we signal all waiters for this stripe. Since not all waiters in this stripe/index belong
+ // to this particular lock, we can't wake up just one of them.
+ if (writeWaitCnt == 0) {
+ Condition readCondition = readConditions[idx];
+
+ readCondition.signalAll();
+ } else {
+ Condition writeCond = writeConditions[idx];
+
+ writeCond.signalAll();
+ }
+ }
+
+ /**
+ * Upgrades a read lock to a write lock. If this thread is the only read-owner of the read lock,
+ * this method will atomically upgrade the read lock to the write lock. In this case {@code true}
+ * will be returned. If not, the read lock will be released and write lock will be acquired, leaving
+ * a potential gap for other threads to modify a protected resource. In this case this method will return
+ * {@code false}.
+ *
+ * <p>After this method has been called, there is no need to call to {@link #readUnlock(long)} because
+ * read lock will be released in any case.
+ *
+ * @param lock Lock to upgrade.
+ * @return {@code null} if tag validation failed, {@code true} if successfully traded the read lock to
+ * the write lock without leaving a gap. Returns {@code false} otherwise, in this case the resource
+ * state must be re-validated.
+ */
+ public Boolean upgradeToWriteLock(long lock, int tag) {
+ for (int i = 0; i < SPIN_CNT; i++) {
+ long state = GridUnsafe.getLongVolatile(null, lock);
+
+ if (!checkTag(state, tag)) {
+ return null;
+ }
+
+ if (lockCount(state) == 1) {
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -2, 0, 0))) {
+ return true;
+ } else {
+ // Retry CAS, do not count as spin cycle.
+ i--;
+ }
+ }
+ }
+
+ int idx = lockIndex(lock);
+
+ ReentrantLock lockObj = locks[idx];
+
+ lockObj.lock();
+
+ try {
+ // First, add write waiter.
+ while (true) {
+ long state = GridUnsafe.getLongVolatile(null, lock);
+
+ if (!checkTag(state, tag)) {
+ return null;
+ }
+
+ if (lockCount(state) == 1) {
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -2, 0, 0))) {
+ return true;
+ } else {
+ continue;
+ }
+ }
+
+ // Remove read lock and add write waiter simultaneously.
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -1, 0, 1))) {
+ break;
+ }
+ }
+
+ return waitAcquireWriteLock(lock, idx, tag);
+ } finally {
+ lockObj.unlock();
+ }
+ }
+
+ /**
+ * Acquires read lock in waiting loop.
+ *
+ * @param lock Lock address.
+ * @param lockIdx Lock index.
+ * @param tag Validation tag.
+ * @return {@code True} if lock was acquired, {@code false} if tag validation failed.
+ */
+ private boolean waitAcquireReadLock(long lock, int lockIdx, int tag) {
+ ReentrantLock lockObj = locks[lockIdx];
+ Condition waitCond = readConditions[lockIdx];
+
+ assert lockObj.isHeldByCurrentThread();
+
+ boolean interrupted = false;
+
+ try {
+ while (true) {
+ try {
+ long state = GridUnsafe.getLongVolatile(null, lock);
+
+ if (!checkTag(state, tag)) {
+ // We cannot lock with this tag, release waiter.
+ long updated = updateState(state, 0, -1, 0);
+
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+ int writeWaitCnt = writersWaitCount(updated);
+
+ signalNextWaiter(writeWaitCnt, lockIdx);
+
+ return false;
+ }
+ } else if (canReadLock(state)) {
+ long updated = updateState(state, 1, -1, 0);
+
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+ return true;
+ }
+ } else {
+ waitCond.await();
+ }
+ } catch (InterruptedException ignore) {
+ interrupted = true;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Acquires write lock in waiting loop.
+ *
+ * @param lock Lock address.
+ * @param lockIdx Lock index.
+ * @param tag Validation tag.
+ * @return {@code True} if lock was acquired, {@code false} if tag validation failed.
+ */
+ private boolean waitAcquireWriteLock(long lock, int lockIdx, int tag) {
+ ReentrantLock lockObj = locks[lockIdx];
+ Condition waitCond = writeConditions[lockIdx];
+
+ assert lockObj.isHeldByCurrentThread();
+
+ boolean interrupted = false;
+
+ try {
+ while (true) {
+ try {
+ long state = GridUnsafe.getLongVolatile(null, lock);
+
+ if (!checkTag(state, tag)) {
+ // We cannot lock with this tag, release waiter.
+ long updated = updateState(state, 0, 0, -1);
+
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+ int writeWaitCnt = writersWaitCount(updated);
+
+ signalNextWaiter(writeWaitCnt, lockIdx);
+
+ return false;
+ }
+ } else if (canWriteLock(state)) {
+ long updated = updateState(state, -1, 0, -1);
+
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+ return true;
+ }
+ } else {
+ waitCond.await();
+ }
+ } catch (InterruptedException ignore) {
+ interrupted = true;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Returns index of lock object corresponding to the stripe of this lock address.
+ *
+ * @param lock Lock address.
+ * @return Lock monitor object that corresponds to the stripe for this lock address.
+ */
+ private int lockIndex(long lock) {
+ return IgniteUtils.safeAbs(IgniteUtils.hash(lock)) & monitorsMask;
+ }
+
+ /**
+ * Checks that read lock can be acquired.
+ *
+ * @param state Lock state.
+ * @return {@code True} if write lock is not acquired.
+ */
+ private boolean canReadLock(long state) {
+ return lockCount(state) >= 0;
+ }
+
+ /**
+ * Checks that write lock can be acquired.
+ *
+ * @param state Lock state.
+ * @return {@code True} if no read locks are acquired.
+ */
+ private boolean canWriteLock(long state) {
+ return lockCount(state) == 0;
+ }
+
+ /**
+ * Checks that tag in the state matches the expected value.
+ *
+ * @param state State.
+ * @param tag Tag.
+ */
+ private boolean checkTag(long state, int tag) {
+ // If passed in tag is negative, lock regardless of the state.
+ return tag < 0 || tag(state) == tag;
+ }
+
+ /**
+ * Extracts lock count from the state.
+ *
+ * @param state State.
+ * @return Lock count.
+ */
+ private int lockCount(long state) {
+ return (short) (state & 0xFFFF);
+ }
+
+ /**
+ * Extracts tag value from the state.
+ *
+ * @param state Lock state.
+ * @return Lock tag.
+ */
+ private int tag(long state) {
+ return (int) ((state >>> 16) & 0xFFFF);
+ }
+
+ /**
+ * Extracts writers wait count from the state.
+ *
+ * @param state State.
+ * @return Writers wait count.
+ */
+ private int writersWaitCount(long state) {
+ return (int) ((state >>> 48) & 0xFFFF);
+ }
+
+ /**
+ * Extracts readers wait count from the state.
+ *
+ * @param state State.
+ * @return Readers wait count.
+ */
+ private int readersWaitCount(long state) {
+ return (int) ((state >>> 32) & 0xFFFF);
+ }
+
+ /**
+ * Updates lock state with deltas.
+ *
+ * @param state State to update.
+ * @param lockDelta Lock counter delta.
+ * @param readersWaitDelta Readers wait delta.
+ * @param writersWaitDelta Writers wait delta.
+ * @return Modified state.
+ */
+ private long updateState(long state, int lockDelta, int readersWaitDelta, int writersWaitDelta) {
+ int lock = lockCount(state);
+ int tag = tag(state);
+ int readersWait = readersWaitCount(state);
+ int writersWait = writersWaitCount(state);
+
+ lock += lockDelta;
+ readersWait += readersWaitDelta;
+ writersWait += writersWaitDelta;
+
+ if (readersWait > MAX_WAITERS) {
+ throw new IllegalStateException("Failed to add read waiter (too many waiting threads): " + MAX_WAITERS);
+ }
+
+ if (writersWait > MAX_WAITERS) {
+ throw new IllegalStateException("Failed to add write waiter (too many waiting threads): " + MAX_WAITERS);
+ }
+
+ assert readersWait >= 0 : readersWait;
+ assert writersWait >= 0 : writersWait;
+ assert lock >= -1;
+
+ return buildState(writersWait, readersWait, tag, lock);
+ }
+
+ /**
+ * Releases write lock and updates tag value.
+ *
+ * @param state State to update.
+ * @return Modified state.
+ */
+ private long releaseWithTag(long state, int newTag) {
+ int lock = lockCount(state);
+ int readersWait = readersWaitCount(state);
+ int writersWait = writersWaitCount(state);
+ int tag = newTag == TAG_LOCK_ALWAYS ? tag(state) : newTag & 0xFFFF;
+
+ lock += 1;
+
+ assert readersWait >= 0 : readersWait;
+ assert writersWait >= 0 : writersWait;
+ assert lock >= -1;
+
+ return buildState(writersWait, readersWait, tag, lock);
+ }
+
+ /**
+ * Creates state from counters.
+ *
+ * @param writersWait Writers wait count.
+ * @param readersWait Readers wait count.
+ * @param tag Tag.
+ * @param lock Lock count.
+ * @return State.
+ */
+ private long buildState(int writersWait, int readersWait, int tag, int lock) {
+ assert (tag & 0xFFFF0000) == 0;
+
+ return ((long) writersWait << 48) | ((long) readersWait << 32) | ((tag & 0x0000FFFFL) << 16) | (lock & 0xFFFFL);
+ }
+
+ /**
+ * Updates readers wait count.
+ *
+ * @param lock Lock to update.
+ * @param delta Delta to update.
+ */
+ private void updateReadersWaitCount(long lock, ReentrantLock lockObj, int delta) {
+ assert lockObj.isHeldByCurrentThread();
+
+ while (true) {
+ // Safe to do non-volatile read because of CAS below.
+ long state = GridUnsafe.getLongVolatile(null, lock);
+
+ long updated = updateState(state, 0, delta, 0);
+
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Updates writers wait count.
+ *
+ * @param lock Lock to update.
+ * @param delta Delta to update.
+ */
+ private void updateWritersWaitCount(long lock, ReentrantLock lockObj, int delta) {
+ assert lockObj.isHeldByCurrentThread();
+
+ while (true) {
+ long state = GridUnsafe.getLongVolatile(null, lock);
+
+ long updated = updateState(state, 0, 0, delta);
+
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+ return;
+ }
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 0d58dcf..5cee67b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -23,10 +23,15 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.util.ArrayList;
import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadFactory;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.lang.IgniteInternalException;
@@ -203,7 +208,7 @@
* @param task Runnable.
* @return Future with task result.
*/
- public static CompletableFuture runAsync(final Runnable task) {
+ public static CompletableFuture<?> runAsync(final Runnable task) {
return runAsync(task, "async-runnable-runner");
}
@@ -213,7 +218,7 @@
* @param task Runnable.
* @return Future with task result.
*/
- public static CompletableFuture runAsync(final Runnable task, String threadName) {
+ public static CompletableFuture<?> runAsync(final Runnable task, String threadName) {
return runAsync(() -> {
task.run();
@@ -258,6 +263,96 @@
}
/**
+ * Runs callable tasks each in separate threads.
+ *
+ * @param calls Callable tasks.
+ * @param threadFactory Thread factory.
+ * @return Execution time in milliseconds.
+ * @throws Exception If failed.
+ */
+ public static long runMultiThreaded(Iterable<Callable<?>> calls, ThreadFactory threadFactory) throws Exception {
+ Collection<Thread> threads = new ArrayList<>();
+
+ Collection<CompletableFuture<?>> futures = new ArrayList<>();
+
+ for (Callable<?> task : calls) {
+ CompletableFuture<?> fut = new CompletableFuture<>();
+
+ futures.add(fut);
+
+ threads.add(threadFactory.newThread(() -> {
+ try {
+ // Execute task.
+ task.call();
+
+ fut.complete(null);
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
+ }
+ }));
+ }
+
+ long time = System.currentTimeMillis();
+
+ for (Thread t : threads) {
+ t.start();
+ }
+
+ // Wait threads finish their job.
+ try {
+ for (Thread t : threads) {
+ t.join();
+ }
+ } catch (InterruptedException e) {
+ for (Thread t : threads) {
+ t.interrupt();
+ }
+
+ throw e;
+ }
+
+ time = System.currentTimeMillis() - time;
+
+ for (CompletableFuture<?> fut : futures) {
+ fut.join();
+ }
+
+ return time;
+ }
+
+ /**
+ * Runs runnable object in specified number of threads.
+ *
+ * @param run Target runnable.
+ * @param threadNum Number of threads.
+ * @param threadName Thread name.
+ * @return Future for the run. Future returns execution time in milliseconds.
+ */
+ public static CompletableFuture<Long> runMultiThreadedAsync(Runnable run, int threadNum, String threadName) {
+ return runMultiThreadedAsync(() -> {
+ run.run();
+
+ return null;
+ }, threadNum, threadName);
+ }
+
+ /**
+ * Runs callable object in specified number of threads.
+ *
+ * @param call Callable.
+ * @param threadNum Number of threads.
+ * @param threadName Thread names.
+ * @return Future for the run. Future returns execution time in milliseconds.
+ */
+ public static CompletableFuture<Long> runMultiThreadedAsync(Callable<?> call, int threadNum, final String threadName) {
+ List<Callable<?>> calls = Collections.<Callable<?>>nCopies(threadNum, call);
+
+ NamedThreadFactory threadFactory = new NamedThreadFactory(threadName);
+
+ return runAsync(() -> runMultiThreaded(calls, threadFactory));
+ }
+
+ /**
* Waits for the condition.
*
* @param cond Condition.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteOffheapReadWriteLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteOffheapReadWriteLockSelfTest.java
new file mode 100644
index 0000000..1828710
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteOffheapReadWriteLockSelfTest.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.Test;
+
+/** Tests basic invariants of {@link OffheapReadWriteLock}. */
+public class IgniteOffheapReadWriteLockSelfTest extends BaseIgniteAbstractTest {
+ /** Initial value for tag in tests. */
+ private static final int TAG_0 = 1;
+
+ /** Number of 1-second iterations in every test. */
+ private static final int ROUNDS_PER_TEST = 3;
+
+ /** Sleep interval for the test. */
+ private static final long SLEEP_TIME = 50L;
+
+ @Test
+ public void testConcurrentUpdatesSingleLock() throws Exception {
+ final int numPairs = 100;
+ final Pair[] data = new Pair[numPairs];
+
+ for (int i = 0; i < numPairs; i++) {
+ data[i] = new Pair();
+ }
+
+ final OffheapReadWriteLock lock = new OffheapReadWriteLock(16);
+
+ final long ptr = GridUnsafe.allocateMemory(OffheapReadWriteLock.LOCK_SIZE);
+
+ lock.init(ptr, TAG_0);
+
+ final AtomicInteger reads = new AtomicInteger();
+ final AtomicInteger writes = new AtomicInteger();
+ final AtomicBoolean done = new AtomicBoolean(false);
+
+ CompletableFuture<Long> fut = IgniteTestUtils.runMultiThreadedAsync(() -> {
+ try {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!done.get()) {
+ boolean write = rnd.nextInt(10) < 2;
+
+ if (write) {
+ boolean locked = lock.writeLock(ptr, TAG_0);
+
+ try {
+ // No tag change in this test.
+ assert locked;
+
+ assertTrue(lock.isWriteLocked(ptr));
+ assertFalse(lock.isReadLocked(ptr));
+
+ int idx = rnd.nextInt(numPairs);
+ int delta = rnd.nextInt(100_000);
+
+ data[idx].left += delta;
+ data[idx].right -= delta;
+ } finally {
+ lock.writeUnlock(ptr, TAG_0);
+ }
+
+ writes.incrementAndGet();
+ } else {
+ boolean locked = lock.readLock(ptr, TAG_0);
+
+ try {
+ assert locked;
+
+ assertFalse(lock.isWriteLocked(ptr));
+ assertTrue(lock.isReadLocked(ptr));
+
+ for (int i1 = 0; i1 < data.length; i1++) {
+ Pair pair = data[i1];
+
+ assertEquals(pair.left, -pair.right, "Failed check for index: " + i1);
+ }
+ } finally {
+ lock.readUnlock(ptr);
+ }
+
+ reads.incrementAndGet();
+ }
+ }
+ } catch (Throwable e) {
+ log.error(e.getMessage(), e);
+ }
+
+ return null;
+ }, 32, "tester");
+
+ for (int i = 0; i < ROUNDS_PER_TEST; i++) {
+ Thread.sleep(SLEEP_TIME);
+
+ log.info("Reads: " + reads.getAndSet(0) + ", writes=" + writes.getAndSet(0));
+ }
+
+ done.set(true);
+
+ fut.get();
+
+ validate(data);
+ }
+
+ @Test
+ public void testConcurrentUpdatesMultipleLocks() throws Exception {
+ final int numPairs = 100;
+ final Pair[] data = new Pair[numPairs];
+
+ final OffheapReadWriteLock lock = new OffheapReadWriteLock(16);
+
+ final long ptr = GridUnsafe.allocateMemory(OffheapReadWriteLock.LOCK_SIZE * numPairs);
+
+ for (int i = 0; i < numPairs; i++) {
+ data[i] = new Pair();
+
+ lock.init(ptr + i * OffheapReadWriteLock.LOCK_SIZE, TAG_0);
+ }
+
+ final AtomicInteger reads = new AtomicInteger();
+ final AtomicInteger writes = new AtomicInteger();
+ final AtomicBoolean done = new AtomicBoolean(false);
+
+ CompletableFuture<Long> fut = IgniteTestUtils.runMultiThreadedAsync(() -> {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!done.get()) {
+ boolean write = rnd.nextInt(10) < 2;
+ int idx = rnd.nextInt(numPairs);
+
+ long lockPtr = ptr + idx * OffheapReadWriteLock.LOCK_SIZE;
+
+ if (write) {
+ lock.writeLock(lockPtr, TAG_0);
+
+ try {
+ assertTrue(lock.isWriteLocked(lockPtr));
+ assertFalse(lock.isReadLocked(lockPtr));
+
+ int delta = rnd.nextInt(100_000);
+
+ data[idx].left += delta;
+ data[idx].right -= delta;
+ } finally {
+ lock.writeUnlock(lockPtr, TAG_0);
+ }
+
+ writes.incrementAndGet();
+ } else {
+ lock.readLock(lockPtr, TAG_0);
+
+ try {
+ assertFalse(lock.isWriteLocked(lockPtr));
+ assertTrue(lock.isReadLocked(lockPtr));
+
+ Pair pair = data[idx];
+
+ assertEquals(pair.left, -pair.right, "Failed check for index: " + idx);
+ } finally {
+ lock.readUnlock(lockPtr);
+ }
+
+ reads.incrementAndGet();
+ }
+ }
+
+ return null;
+ }, 32, "tester");
+
+ for (int i = 0; i < ROUNDS_PER_TEST; i++) {
+ Thread.sleep(SLEEP_TIME);
+
+ log.info("Reads: " + reads.getAndSet(0) + ", writes=" + writes.getAndSet(0));
+ }
+
+ done.set(true);
+
+ fut.get();
+
+ validate(data);
+ }
+
+ @Test
+ public void testLockUpgradeMultipleLocks() throws Exception {
+ final int numPairs = 100;
+ final Pair[] data = new Pair[numPairs];
+
+ final OffheapReadWriteLock lock = new OffheapReadWriteLock(16);
+
+ final long ptr = GridUnsafe.allocateMemory(OffheapReadWriteLock.LOCK_SIZE * numPairs);
+
+ for (int i = 0; i < numPairs; i++) {
+ data[i] = new Pair();
+
+ lock.init(ptr + i * OffheapReadWriteLock.LOCK_SIZE, TAG_0);
+ }
+
+ final AtomicInteger reads = new AtomicInteger();
+ final AtomicInteger writes = new AtomicInteger();
+ final AtomicInteger successfulUpgrades = new AtomicInteger();
+ final AtomicBoolean done = new AtomicBoolean(false);
+
+ CompletableFuture<Long> fut = IgniteTestUtils.runMultiThreadedAsync(() -> {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!done.get()) {
+ int idx = rnd.nextInt(numPairs);
+
+ long lockPtr = ptr + idx * OffheapReadWriteLock.LOCK_SIZE;
+
+ boolean locked = lock.readLock(lockPtr, TAG_0);
+
+ boolean write = false;
+
+ try {
+ assert locked;
+
+ Pair pair = data[idx];
+
+ assertEquals(pair.left, -pair.right, "Failed check for index: " + idx);
+
+ write = rnd.nextInt(10) < 2;
+
+ if (write) {
+ // TAG fail will cause NPE.
+ boolean upg = lock.upgradeToWriteLock(lockPtr, TAG_0);
+
+ writes.incrementAndGet();
+
+ if (upg) {
+ successfulUpgrades.incrementAndGet();
+ }
+
+ int delta = rnd.nextInt(100_000);
+
+ pair.left += delta;
+ pair.right -= delta;
+ }
+ } finally {
+ if (write) {
+ lock.writeUnlock(lockPtr, TAG_0);
+ } else {
+ lock.readUnlock(lockPtr);
+ }
+ }
+
+ reads.incrementAndGet();
+ }
+
+ return null;
+ }, 32, "tester");
+
+ for (int i = 0; i < ROUNDS_PER_TEST; i++) {
+ Thread.sleep(SLEEP_TIME);
+
+ log.info("Reads=" + reads.getAndSet(0) + ", writes=" + writes.getAndSet(0) + ", upgrades=" + successfulUpgrades.getAndSet(0));
+ }
+
+ done.set(true);
+
+ fut.get();
+
+ validate(data);
+ }
+
+ @Test
+ public void testTagIdUpdateWait() throws Exception {
+ checkTagIdUpdate(true);
+ }
+
+ @Test
+ public void testTagIdUpdateContinuous() throws Exception {
+ checkTagIdUpdate(false);
+ }
+
+ private void checkTagIdUpdate(final boolean waitBeforeSwitch) throws Exception {
+ final int numPairs = 100;
+ final Pair[] data = new Pair[numPairs];
+
+ for (int i = 0; i < numPairs; i++) {
+ data[i] = new Pair();
+ }
+
+ final OffheapReadWriteLock lock = new OffheapReadWriteLock(16);
+
+ final long ptr = GridUnsafe.allocateMemory(OffheapReadWriteLock.LOCK_SIZE);
+
+ lock.init(ptr, TAG_0);
+
+ final AtomicInteger reads = new AtomicInteger();
+ final AtomicInteger writes = new AtomicInteger();
+ final AtomicBoolean done = new AtomicBoolean(false);
+ final AtomicBoolean run = new AtomicBoolean(true);
+
+ final int threadCnt = 32;
+
+ final CyclicBarrier barr = new CyclicBarrier(threadCnt, () -> {
+ if (done.get()) {
+ run.set(false);
+ }
+ });
+
+ CompletableFuture<Long> fut = IgniteTestUtils.runMultiThreadedAsync(() -> {
+ try {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int tag = TAG_0;
+
+ long lastSwitch = System.currentTimeMillis();
+
+ while (run.get()) {
+ boolean write = rnd.nextInt(10) < 2;
+
+ boolean locked;
+
+ boolean switched = false;
+
+ if (write) {
+ locked = lock.writeLock(ptr, tag);
+
+ if (locked) {
+ try {
+ assertTrue(lock.isWriteLocked(ptr));
+ assertFalse(lock.isReadLocked(ptr));
+
+ int idx = rnd.nextInt(numPairs);
+ int delta = rnd.nextInt(100_000);
+
+ data[idx].left += delta;
+ data[idx].right -= delta;
+ } finally {
+ switched = System.currentTimeMillis() - lastSwitch > 20 || !waitBeforeSwitch;
+
+ if (switched && waitBeforeSwitch) {
+ log.info("Switching...");
+ }
+
+ int tag1 = (tag + (switched ? 1 : 0)) & 0xFFFF;
+
+ if (tag1 == 0) {
+ tag1 = 1;
+ }
+
+ lock.writeUnlock(ptr, tag1);
+ }
+
+ writes.incrementAndGet();
+ }
+ } else {
+ locked = lock.readLock(ptr, tag);
+
+ if (locked) {
+ try {
+ assert locked;
+
+ assertFalse(lock.isWriteLocked(ptr));
+ assertTrue(lock.isReadLocked(ptr));
+
+ for (int i1 = 0; i1 < data.length; i1++) {
+ Pair pair = data[i1];
+
+ assertEquals(pair.left, -pair.right, "Failed check for index: " + i1);
+ }
+ } finally {
+ lock.readUnlock(ptr);
+ }
+
+ reads.incrementAndGet();
+ }
+ }
+
+ if (!locked || switched) {
+ try {
+ barr.await();
+ } catch (BrokenBarrierException e) {
+ // Done.
+ log.error(e.getMessage(), e);
+
+ return null;
+ }
+
+ tag = (tag + 1) & 0xFFFF;
+
+ if (tag == 0) {
+ tag = 1;
+ }
+
+ if (waitBeforeSwitch || (!waitBeforeSwitch && tag == 1)) {
+ log.info("Switch to a new tag: " + tag);
+ }
+
+ lastSwitch = System.currentTimeMillis();
+ }
+ }
+ } catch (Throwable e) {
+ log.error(e.getMessage(), e);
+ }
+
+ return null;
+ }, threadCnt, "tester");
+
+ for (int i = 0; i < ROUNDS_PER_TEST; i++) {
+ Thread.sleep(SLEEP_TIME);
+
+ log.info("Reads: " + reads.getAndSet(0) + ", writes=" + writes.getAndSet(0));
+ }
+
+ done.set(true);
+
+ fut.get();
+
+ validate(data);
+ }
+
+ /**
+ * Validates data integrity.
+ *
+ * @param data Data to validate.
+ */
+ private void validate(Pair[] data) {
+ for (int i = 0; i < data.length; i++) {
+ Pair pair = data[i];
+
+ assertEquals(pair.left, -pair.right, "Failed for index: " + i);
+ }
+ }
+
+ /** Pair of integers. */
+ private static class Pair {
+ /** Left value of the pair. */
+ private int left;
+
+ /** Right value of the pair. */
+ private int right;
+ }
+}
diff --git a/modules/page-memory/pom.xml b/modules/page-memory/pom.xml
index 9785896..55a8871 100644
--- a/modules/page-memory/pom.xml
+++ b/modules/page-memory/pom.xml
@@ -35,6 +35,11 @@
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
</dependency>
@@ -44,5 +49,25 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
</dependencies>
</project>
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemory.java
index 88e63f3..80d92ed 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemory.java
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import org.apache.ignite.internal.pagememory.io.PageIo;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.lang.IgniteInternalException;
/**
* Class responsible for pages storage and handling.
@@ -27,6 +28,18 @@
//TODO IGNITE-16350 Improve javadoc in this class.
public interface PageMemory extends PageIdAllocator, PageSupport {
/**
+ * Starts page memory.
+ */
+ void start() throws IgniteInternalException;
+
+ /**
+ * Stops page memory.
+ *
+ * @param deallocate {@code True} to deallocate memory, {@code false} to allow memory reuse on subsequent {@link #start()}
+ */
+ void stop(boolean deallocate) throws IgniteInternalException;
+
+ /**
* Returns a page's size in bytes.
*/
int pageSize();
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoStoreImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoStoreImpl.java
new file mode 100644
index 0000000..dd69e13
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoStoreImpl.java
@@ -0,0 +1,890 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.impl;
+
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.OffheapReadWriteLock;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.IgniteSystemProperties;
+
+/**
+ * Page header structure is described by the following diagram.
+ *
+ * <p>When page is not allocated (in a free list):
+ * <pre>
+ * +--------+--------+---------------------------------------------+
+ * |8 bytes |8 bytes | PAGE_SIZE + PAGE_OVERHEAD - 16 bytes |
+ * +--------+--------+---------------------------------------------+
+ * |Next ptr|Rel ptr | Empty |
+ * +--------+--------+---------------------------------------------+
+ * </pre>
+ * <p/>
+ * When page is allocated and is in use:
+ * <pre>
+ * +--------+--------+--------+--------+---------------------------+
+ * |8 bytes |8 bytes |8 bytes |8 bytes | PAGE_SIZE |
+ * +--------+--------+--------+--------+---------------------------+
+ * | Marker |Page ID |Pin CNT | Lock | Page data |
+ * +--------+--------+--------+--------+---------------------------+
+ * </pre>
+ *
+ * <p>Note that first 8 bytes of page header are used either for page marker or for next relative pointer depending
+ * on whether the page is in use or not.
+ */
+public class PageMemoryNoStoreImpl implements PageMemory {
+ /** Logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(PageMemoryNoStoreImpl.class);
+
+ /** Ignite page memory concurrency level. */
+ private static final String IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL = "IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL";
+
+ /** Marker bytes that signify beginning of used page in memory. */
+ public static final long PAGE_MARKER = 0xBEEAAFDEADBEEF01L;
+
+ /** Full relative pointer mask. */
+ private static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
+
+ /** Invalid relative pointer value. */
+ private static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
+
+ /** Address mask to avoid ABA problem. */
+ private static final long ADDRESS_MASK = 0xFFFFFFFFFFFFFFL;
+
+ /** Counter mask to avoid ABA problem. */
+ private static final long COUNTER_MASK = ~ADDRESS_MASK;
+
+ /** Counter increment to avoid ABA problem. */
+ private static final long COUNTER_INC = ADDRESS_MASK + 1;
+
+ /** Page ID offset. */
+ public static final int PAGE_ID_OFFSET = 8;
+
+ /** Page pin counter offset. */
+ public static final int LOCK_OFFSET = 16;
+
+ /**
+ * Need a 8-byte pointer for linked list, 8 bytes for internal needs (flags),
+ * 4 bytes cache ID, 8 bytes timestamp.
+ */
+ public static final int PAGE_OVERHEAD = LOCK_OFFSET + OffheapReadWriteLock.LOCK_SIZE;
+
+ /** Number of bits required to store segment index. */
+ private static final int SEG_BITS = 4;
+
+ /** Number of bits required to store segment index. */
+ private static final int SEG_CNT = (1 << SEG_BITS);
+
+ /** Number of bits left to store page index. */
+ private static final int IDX_BITS = PageIdUtils.PAGE_IDX_SIZE - SEG_BITS;
+
+ /** Segment mask. */
+ private static final int SEG_MASK = ~(-1 << SEG_BITS);
+
+ /** Index mask. */
+ private static final int IDX_MASK = ~(-1 << IDX_BITS);
+
+ /** Page size. */
+ private final int sysPageSize;
+
+ /** Direct memory allocator. */
+ private final DirectMemoryProvider directMemoryProvider;
+
+ /** Data region configuration view. */
+ private final PageMemoryDataRegionView dataRegionCfg;
+
+ /** Head of the singly linked list of free pages. */
+ private final AtomicLong freePageListHead = new AtomicLong(INVALID_REL_PTR);
+
+ /** Segments array. */
+ private volatile Segment[] segments;
+
+ /** Lock for segments changes. */
+ private final Object segmentsLock = new Object();
+
+ /** Total number of pages loaded into memory. */
+ private final AtomicInteger allocatedPages = new AtomicInteger();
+
+ /** Offheap read write lock instance. */
+ private final OffheapReadWriteLock rwLock;
+
+ /** Concurrency level. */
+ private final int lockConcLvl = IgniteSystemProperties.getInteger(
+ IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL,
+ Integer.highestOneBit(Runtime.getRuntime().availableProcessors() * 4)
+ );
+
+ /** Total number of pages may be allocated for this instance. */
+ private final int totalPages;
+
+ /** Flag for enabling of acquired pages tracking. */
+ private final boolean trackAcquiredPages;
+
+ /** Page IO registry. */
+ private final PageIoRegistry ioRegistry;
+
+ /**
+ * {@code False} if memory was not started or already stopped and is not supposed for any usage.
+ */
+ private volatile boolean started;
+
+ /**
+ * Constructor.
+ *
+ * @param directMemoryProvider Memory allocator to use.
+ * @param dataRegionCfg Data region configuration.
+ * @param ioRegistry IO registry.
+ */
+ public PageMemoryNoStoreImpl(
+ DirectMemoryProvider directMemoryProvider,
+ PageMemoryDataRegionConfiguration dataRegionCfg,
+ PageIoRegistry ioRegistry
+ ) {
+ this.directMemoryProvider = directMemoryProvider;
+ this.ioRegistry = ioRegistry;
+ this.trackAcquiredPages = false;
+ this.dataRegionCfg = (PageMemoryDataRegionView) dataRegionCfg.value();
+
+ int pageSize = this.dataRegionCfg.pageSize();
+
+ sysPageSize = pageSize + PAGE_OVERHEAD;
+
+ assert sysPageSize % 8 == 0 : sysPageSize;
+
+ totalPages = (int) (this.dataRegionCfg.maxSize() / sysPageSize);
+
+ rwLock = new OffheapReadWriteLock(lockConcLvl);
+ }
+
+ @Override
+ public void start() throws IgniteInternalException {
+ synchronized (segmentsLock) {
+ if (started) {
+ return;
+ }
+
+ started = true;
+
+ long startSize = dataRegionCfg.initSize();
+ long maxSize = dataRegionCfg.maxSize();
+
+ long[] chunks = new long[SEG_CNT];
+
+ chunks[0] = startSize;
+
+ long total = startSize;
+
+ long allocChunkSize = Math.max((maxSize - startSize) / (SEG_CNT - 1), 256L * 1024 * 1024);
+
+ int lastIdx = 0;
+
+ for (int i = 1; i < SEG_CNT; i++) {
+ long allocSize = Math.min(allocChunkSize, maxSize - total);
+
+ if (allocSize <= 0) {
+ break;
+ }
+
+ chunks[i] = allocSize;
+
+ total += allocSize;
+
+ lastIdx = i;
+ }
+
+ if (lastIdx != SEG_CNT - 1) {
+ chunks = Arrays.copyOf(chunks, lastIdx + 1);
+ }
+
+ if (segments == null) {
+ directMemoryProvider.initialize(chunks);
+ }
+
+ addSegment(null);
+ }
+ }
+
+ @Override
+ public void stop(boolean deallocate) throws IgniteInternalException {
+ synchronized (segmentsLock) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping page memory.");
+ }
+
+ started = false;
+
+ directMemoryProvider.shutdown(deallocate);
+
+ if (directMemoryProvider instanceof Closeable) {
+ try {
+ ((Closeable) directMemoryProvider).close();
+ } catch (IOException e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer pageBuffer(long pageAddr) {
+ return wrapPointer(pageAddr, pageSize());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long allocatePage(int grpId, int partId, byte flags) {
+ assert started;
+
+ long relPtr = borrowFreePage();
+ long absPtr = 0;
+
+ if (relPtr != INVALID_REL_PTR) {
+ int pageIdx = PageIdUtils.pageIndex(relPtr);
+
+ Segment seg = segment(pageIdx);
+
+ absPtr = seg.absolute(pageIdx);
+ } else {
+ // No segments contained a free page.
+ Segment[] seg0 = segments;
+ Segment allocSeg = seg0[seg0.length - 1];
+
+ while (allocSeg != null) {
+ relPtr = allocSeg.allocateFreePage(flags);
+
+ if (relPtr != INVALID_REL_PTR) {
+ absPtr = allocSeg.absolute(PageIdUtils.pageIndex(relPtr));
+
+ allocatedPages.incrementAndGet();
+
+ break;
+ } else {
+ allocSeg = addSegment(seg0);
+ }
+ }
+ }
+
+ if (relPtr == INVALID_REL_PTR) {
+ IgniteOutOfMemoryException oom = new IgniteOutOfMemoryException("Out of memory in data region ["
+ + "name=" + dataRegionCfg.name()
+ + ", initSize=" + IgniteUtils.readableSize(dataRegionCfg.initSize(), false)
+ + ", maxSize=" + IgniteUtils.readableSize(dataRegionCfg.maxSize(), false)
+ + ", persistenceEnabled=" + dataRegionCfg.persistent() + "] Try the following:\n"
+ + " ^-- Increase maximum off-heap memory size (DataRegionConfiguration.maxSize)\n"
+ + " ^-- Enable Ignite persistence (DataRegionConfiguration.persistenceEnabled)\n"
+ + " ^-- Enable eviction or expiration policies"
+ );
+
+ //TODO Fail node with failure handler.
+
+ throw oom;
+ }
+
+ assert (relPtr & ~PageIdUtils.PAGE_IDX_MASK) == 0 : IgniteUtils.hexLong(relPtr & ~PageIdUtils.PAGE_IDX_MASK);
+
+ // Assign page ID according to flags and partition ID.
+ long pageId = PageIdUtils.pageId(partId, flags, (int) relPtr);
+
+ writePageId(absPtr, pageId);
+
+ GridUnsafe.zeroMemory(absPtr + PAGE_OVERHEAD, sysPageSize - PAGE_OVERHEAD);
+
+ return pageId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean freePage(int grpId, long pageId) {
+ assert started;
+
+ releaseFreePage(pageId);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int pageSize() {
+ return sysPageSize - PAGE_OVERHEAD;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int systemPageSize() {
+ return sysPageSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int realPageSize(int grpId) {
+ return pageSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long loadedPages() {
+ return allocatedPages.get();
+ }
+
+ /**
+ * Returns a total number of pages may be allocated for this instance.
+ */
+ public int totalPages() {
+ return totalPages;
+ }
+
+ /**
+ * Return a total number of acquired pages.
+ */
+ public long acquiredPages() {
+ long total = 0;
+
+ for (Segment seg : segments) {
+ seg.readLock().lock();
+
+ try {
+ int acquired = seg.acquiredPages();
+
+ assert acquired >= 0;
+
+ total += acquired;
+ } finally {
+ seg.readLock().unlock();
+ }
+ }
+
+ return total;
+ }
+
+ /**
+ * Writes page ID to the page at the given absolute position.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @param pageId Page ID to write.
+ */
+ private void writePageId(long absPtr, long pageId) {
+ GridUnsafe.putLong(absPtr + PAGE_ID_OFFSET, pageId);
+ }
+
+ /**
+ * Returns the segment that contains given page.
+ *
+ * @param pageIdx Page index.
+ * @return Segment.
+ */
+ private Segment segment(int pageIdx) {
+ int segIdx = segmentIndex(pageIdx);
+
+ return segments[segIdx];
+ }
+
+ /**
+ * Extracts a segment index from the full page index.
+ *
+ * @param pageIdx Page index to extract segment index from.
+ * @return Segment index.
+ */
+ private int segmentIndex(long pageIdx) {
+ return (int) ((pageIdx >> IDX_BITS) & SEG_MASK);
+ }
+
+ /**
+ * Creates a full page index.
+ *
+ * @param segIdx Segment index.
+ * @param pageIdx Page index inside of the segment.
+ * @return Full page index.
+ */
+ private long fromSegmentIndex(int segIdx, long pageIdx) {
+ long res = 0;
+
+ res = (res << SEG_BITS) | (segIdx & SEG_MASK);
+ res = (res << IDX_BITS) | (pageIdx & IDX_MASK);
+
+ return res;
+ }
+
+ // *** PageSupport methods ***
+
+ /** {@inheritDoc} */
+ @Override public long acquirePage(int cacheId, long pageId) {
+ return acquirePage(cacheId, pageId, IoStatisticsHolderNoOp.INSTANCE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long acquirePage(int cacheId, long pageId, IoStatisticsHolder statHolder) {
+ assert started;
+
+ int pageIdx = PageIdUtils.pageIndex(pageId);
+
+ Segment seg = segment(pageIdx);
+
+ long absPtr = seg.acquirePage(pageIdx);
+
+ statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
+
+ return absPtr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void releasePage(int cacheId, long pageId, long page) {
+ assert started;
+
+ if (trackAcquiredPages) {
+ Segment seg = segment(PageIdUtils.pageIndex(pageId));
+
+ seg.onPageRelease();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long readLock(int cacheId, long pageId, long page) {
+ assert started;
+
+ if (rwLock.readLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId))) {
+ return page + PAGE_OVERHEAD;
+ }
+
+ return 0L;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long readLockForce(int cacheId, long pageId, long page) {
+ assert started;
+
+ if (rwLock.readLock(page + LOCK_OFFSET, -1)) {
+ return page + PAGE_OVERHEAD;
+ }
+
+ return 0L;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readUnlock(int cacheId, long pageId, long page) {
+ assert started;
+
+ rwLock.readUnlock(page + LOCK_OFFSET);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long writeLock(int cacheId, long pageId, long page) {
+ assert started;
+
+ if (rwLock.writeLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId))) {
+ return page + PAGE_OVERHEAD;
+ }
+
+ return 0L;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long tryWriteLock(int cacheId, long pageId, long page) {
+ assert started;
+
+ if (rwLock.tryWriteLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId))) {
+ return page + PAGE_OVERHEAD;
+ }
+
+ return 0L;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeUnlock(
+ int cacheId,
+ long pageId,
+ long page,
+ boolean dirtyFlag
+ ) {
+ assert started;
+
+ long actualId = PageIo.getPageId(page + PAGE_OVERHEAD);
+
+ rwLock.writeUnlock(page + LOCK_OFFSET, PageIdUtils.tag(actualId));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDirty(int cacheId, long pageId, long page) {
+ // always false for page no store.
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PageIoRegistry ioRegistry() {
+ return ioRegistry;
+ }
+
+ /**
+ * Converts page index into a sequence number.
+ *
+ * @param pageIdx Page index.
+ * @return Total page sequence number.
+ */
+ public int pageSequenceNumber(int pageIdx) {
+ Segment seg = segment(pageIdx);
+
+ return seg.sequenceNumber(pageIdx);
+ }
+
+ /**
+ * Converts sequential page number to the page index.
+ *
+ * @param seqNo Page sequence number.
+ * @return Page index.
+ */
+ public int pageIndex(int seqNo) {
+ Segment[] segs = segments;
+
+ int low = 0;
+ int high = segs.length - 1;
+
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+
+ Segment seg = segs[mid];
+
+ int cmp = seg.containsPageBySequence(seqNo);
+
+ if (cmp < 0) {
+ high = mid - 1;
+ } else if (cmp > 0) {
+ low = mid + 1;
+ } else {
+ return seg.pageIndex(seqNo);
+ }
+ }
+
+ throw new IgniteInternalException("Allocated page must always be present in one of the segments [seqNo=" + seqNo
+ + ", segments=" + Arrays.toString(segs) + ']');
+ }
+
+ /**
+ * Adds a page to the free pages list.
+ *
+ * @param pageId Page ID to release.
+ */
+ private void releaseFreePage(long pageId) {
+ int pageIdx = PageIdUtils.pageIndex(pageId);
+
+ // Clear out flags and file ID.
+ long relPtr = PageIdUtils.pageId(0, (byte) 0, pageIdx);
+
+ Segment seg = segment(pageIdx);
+
+ long absPtr = seg.absolute(pageIdx);
+
+ // Second, write clean relative pointer instead of page ID.
+ writePageId(absPtr, relPtr);
+
+ // Third, link the free page.
+ while (true) {
+ long freePageRelPtrMasked = freePageListHead.get();
+
+ long freePageRelPtr = freePageRelPtrMasked & RELATIVE_PTR_MASK;
+
+ GridUnsafe.putLong(absPtr, freePageRelPtr);
+
+ if (freePageListHead.compareAndSet(freePageRelPtrMasked, relPtr)) {
+ allocatedPages.decrementAndGet();
+
+ return;
+ }
+ }
+ }
+
+ /**
+ * Returns a relative pointer to a free page that was borrowed from the allocated pool.
+ */
+ private long borrowFreePage() {
+ while (true) {
+ long freePageRelPtrMasked = freePageListHead.get();
+
+ long freePageRelPtr = freePageRelPtrMasked & ADDRESS_MASK;
+
+ // no free pages available
+ if (freePageRelPtr == INVALID_REL_PTR) {
+ return INVALID_REL_PTR;
+ }
+
+ int pageIdx = PageIdUtils.pageIndex(freePageRelPtr);
+
+ Segment seg = segment(pageIdx);
+
+ long freePageAbsPtr = seg.absolute(pageIdx);
+ long nextFreePageRelPtr = GridUnsafe.getLong(freePageAbsPtr) & ADDRESS_MASK;
+ long cnt = ((freePageRelPtrMasked & COUNTER_MASK) + COUNTER_INC) & COUNTER_MASK;
+
+ if (freePageListHead.compareAndSet(freePageRelPtrMasked, nextFreePageRelPtr | cnt)) {
+ GridUnsafe.putLong(freePageAbsPtr, PAGE_MARKER);
+
+ allocatedPages.incrementAndGet();
+
+ return freePageRelPtr;
+ }
+ }
+ }
+
+ /**
+ * Attempts to add a new memory segment.
+ *
+ * @param oldRef Old segments array. If this method observes another segments array, it will allocate a new
+ * segment (if possible). If the array has already been updated, it will return the last element in the
+ * new array.
+ * @return Added segment, if successfull, {@code null} if failed to add.
+ */
+ private synchronized Segment addSegment(Segment[] oldRef) {
+ if (segments == oldRef) {
+ DirectMemoryRegion region = directMemoryProvider.nextRegion();
+
+ // No more memory is available.
+ if (region == null) {
+ return null;
+ }
+
+ if (oldRef != null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Allocated next memory segment [plcName=" + dataRegionCfg.name()
+ + ", chunkSize=" + IgniteUtils.readableSize(region.size(), true) + ']');
+ }
+ }
+
+ Segment[] newRef = new Segment[oldRef == null ? 1 : oldRef.length + 1];
+
+ if (oldRef != null) {
+ System.arraycopy(oldRef, 0, newRef, 0, oldRef.length);
+ }
+
+ Segment lastSeg = oldRef == null ? null : oldRef[oldRef.length - 1];
+
+ Segment allocated = new Segment(newRef.length - 1, region, lastSeg == null ? 0 : lastSeg.sumPages());
+
+ allocated.init();
+
+ newRef[newRef.length - 1] = allocated;
+
+ segments = newRef;
+ }
+
+ // Only this synchronized method writes to segments, so it is safe to read twice.
+ return segments[segments.length - 1];
+ }
+
+ private class Segment extends ReentrantReadWriteLock {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Segment index. */
+ private final int idx;
+
+ /** Direct memory chunk. */
+ private final DirectMemoryRegion region;
+
+ /** Last allocated page index. */
+ private long lastAllocatedIdxPtr;
+
+ /** Base address for all pages. */
+ private long pagesBase;
+
+ /** Capacity of all previous segments combined. */
+ private final int pagesInPrevSegments;
+
+ /** Segments capacity. */
+ private int maxPages;
+
+ /** Total number of currently acquired pages. */
+ private final AtomicInteger acquiredPages;
+
+ /**
+ * Constructor.
+ *
+ * @param idx Index.
+ * @param region Memory region to use.
+ * @param pagesInPrevSegments Number of pages in previously allocated segments.
+ */
+ private Segment(int idx, DirectMemoryRegion region, int pagesInPrevSegments) {
+ this.idx = idx;
+ this.region = region;
+ this.pagesInPrevSegments = pagesInPrevSegments;
+
+ acquiredPages = new AtomicInteger();
+ }
+
+ /**
+ * Initializes page memory segment.
+ */
+ private void init() {
+ long base = region.address();
+
+ lastAllocatedIdxPtr = base;
+
+ base += 8;
+
+ // Align by 8 bytes.
+ pagesBase = (base + 7) & ~0x7;
+
+ GridUnsafe.putLong(lastAllocatedIdxPtr, 0);
+
+ long limit = region.address() + region.size();
+
+ maxPages = (int) ((limit - pagesBase) / sysPageSize);
+ }
+
+ /**
+ * Acquires a page from the segment..
+ *
+ * @param pageIdx Page index.
+ * @return Page absolute pointer.
+ */
+ private long acquirePage(int pageIdx) {
+ long absPtr = absolute(pageIdx);
+
+ assert absPtr % 8 == 0 : absPtr;
+
+ if (trackAcquiredPages) {
+ acquiredPages.incrementAndGet();
+ }
+
+ return absPtr;
+ }
+
+ private void onPageRelease() {
+ acquiredPages.decrementAndGet();
+ }
+
+ /**
+ * Returns absolute pointer to the page with given index.
+ *
+ * @param pageIdx Page index.
+ * @return Absolute pointer.
+ */
+ private long absolute(int pageIdx) {
+ pageIdx &= IDX_MASK;
+
+ long off = ((long) pageIdx) * sysPageSize;
+
+ return pagesBase + off;
+ }
+
+ /**
+ * Converts page index into a sequence number.
+ *
+ * @param pageIdx Page index with encoded segment.
+ * @return Absolute page sequence number.
+ */
+ private int sequenceNumber(int pageIdx) {
+ pageIdx &= IDX_MASK;
+
+ return pagesInPrevSegments + pageIdx;
+ }
+
+ /**
+ * Returns a page sequence number upper bound.
+ */
+ private int sumPages() {
+ return pagesInPrevSegments + maxPages;
+ }
+
+ /**
+ * Returns a total number of currently acquired pages.
+ */
+ private int acquiredPages() {
+ return acquiredPages.get();
+ }
+
+ /**
+ * Allocates new page in the segment.
+ *
+ * @param tag Tag to initialize RW lock.
+ * @return Relative pointer of the allocated page or {@link #INVALID_REL_PTR} if segment is overflown.
+ */
+ private long allocateFreePage(int tag) {
+ long limit = region.address() + region.size();
+
+ while (true) {
+ long lastIdx = GridUnsafe.getLongVolatile(null, lastAllocatedIdxPtr);
+
+ // Check if we have enough space to allocate a page.
+ if (pagesBase + (lastIdx + 1) * sysPageSize > limit) {
+ return INVALID_REL_PTR;
+ }
+
+ if (GridUnsafe.compareAndSwapLong(null, lastAllocatedIdxPtr, lastIdx, lastIdx + 1)) {
+ long absPtr = pagesBase + lastIdx * sysPageSize;
+
+ assert lastIdx <= PageIdUtils.MAX_PAGE_NUM : lastIdx;
+
+ long pageIdx = fromSegmentIndex(idx, lastIdx);
+
+ assert pageIdx != INVALID_REL_PTR;
+
+ writePageId(absPtr, pageIdx);
+
+ GridUnsafe.putLong(absPtr, PAGE_MARKER);
+
+ rwLock.init(absPtr + LOCK_OFFSET, tag);
+
+ return pageIdx;
+ }
+ }
+ }
+
+ /**
+ * Checks if given sequence number belongs to the current segment.
+ *
+ * @param seqNo Page sequence number.
+ * @return {@code 0} if this segment contains the page with the given sequence number,
+ * {@code -1} if one of the previous segments contains the page with the given sequence number,
+ * {@code 1} if one of the next segments contains the page with the given sequence number.
+ */
+ public int containsPageBySequence(int seqNo) {
+ if (seqNo < pagesInPrevSegments) {
+ return -1;
+ } else if (seqNo < pagesInPrevSegments + maxPages) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+
+ /**
+ * Converts sequential page number to the page index.
+ *
+ * @param seqNo Page sequence number.
+ * @return Page index
+ */
+ public int pageIndex(int seqNo) {
+ return PageIdUtils.pageIndex(fromSegmentIndex(idx, seqNo - pagesInPrevSegments));
+ }
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java
new file mode 100644
index 0000000..f8dfcdc
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem;
+
+/**
+ * Direct memory provider interface. Not thread-safe.
+ */
+public interface DirectMemoryProvider {
+ /**
+ * Initializes provider with the chunk sizes.
+ *
+ * @param chunkSizes Chunk sizes.
+ */
+ public void initialize(long[] chunkSizes);
+
+ /**
+ * Shuts down the provider.
+ *
+ * @param deallocate {@code True} to deallocate memory, {@code false} to allow memory reuse.
+ */
+ public void shutdown(boolean deallocate);
+
+ /**
+ * Attempts to allocate next memory region. Will return {@code null} if no more regions are available.
+ *
+ * @return Next memory region.
+ */
+ public DirectMemoryRegion nextRegion();
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryRegion.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryRegion.java
new file mode 100644
index 0000000..1070c96
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryRegion.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem;
+
+/**
+ * Interface for a direct memory region allocated by provider.
+ */
+public interface DirectMemoryRegion {
+ /**
+ * Returns a region start address.
+ */
+ public long address();
+
+ /**
+ * Returns a region size.
+ */
+ public long size();
+
+ /**
+ * Creates a sub-region of this region starting from the given offset.
+ *
+ * @param offset Offset within this region.
+ * @return Sub-region.
+ */
+ public DirectMemoryRegion slice(long offset);
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/IgniteOutOfMemoryException.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/IgniteOutOfMemoryException.java
new file mode 100644
index 0000000..a7e8647
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/IgniteOutOfMemoryException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem;
+
+import java.io.IOException;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Exception that signifies that there's no more available page in the data region.
+ *
+ * @see PageMemory#allocatePage(int, int, byte)
+ */
+public class IgniteOutOfMemoryException extends IgniteInternalException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructor.
+ */
+ public IgniteOutOfMemoryException() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param msg Error message.
+ */
+ public IgniteOutOfMemoryException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param msg Error message.
+ * @param e Cause exception.
+ */
+ public IgniteOutOfMemoryException(String msg, IOException e) {
+ super(msg, e);
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/MemoryAllocator.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/MemoryAllocator.java
new file mode 100644
index 0000000..4918610
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/MemoryAllocator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem;
+
+/**
+ * Base interface for offheap memory allocator.
+ */
+public interface MemoryAllocator {
+ /**
+ * Allocates memory.
+ *
+ * @param size Size of allocated memory.
+ *
+ * @return Pointer to memory or {@code 0} if failed.
+ */
+ public long allocateMemory(long size);
+
+ /**
+ * Deallocates memory.
+ *
+ * @param addr Address of memory.
+ */
+ public void freeMemory(long addr);
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeChunk.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeChunk.java
new file mode 100644
index 0000000..72c2c3c
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeChunk.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem.unsafe;
+
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Basic implementation of {@link DirectMemoryRegion} that stores direct memory address and the length of the region.
+ */
+public class UnsafeChunk implements DirectMemoryRegion {
+ /** Raw pointer. */
+ private long ptr;
+
+ /** Size of the chunk. */
+ private long len;
+
+ /**
+ * Constructor.
+ *
+ * @param ptr Pointer to the memory start.
+ * @param len Memory length.
+ */
+ public UnsafeChunk(long ptr, long len) {
+ this.ptr = ptr;
+ this.len = len;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long address() {
+ return ptr;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long size() {
+ return len;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DirectMemoryRegion slice(long offset) {
+ if (offset < 0 || offset >= len) {
+ throw new IllegalArgumentException("Failed to create a memory region slice [ptr=" + IgniteUtils.hexLong(ptr)
+ + ", len=" + len + ", offset=" + offset + ']');
+ }
+
+ return new UnsafeChunk(ptr + offset, len - offset);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return S.toString(UnsafeChunk.class, this);
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeMemoryAllocator.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeMemoryAllocator.java
new file mode 100644
index 0000000..784424a
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeMemoryAllocator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem.unsafe;
+
+import org.apache.ignite.internal.pagememory.mem.MemoryAllocator;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/** Memory allocator implementation that uses {@link GridUnsafe}. */
+public class UnsafeMemoryAllocator implements MemoryAllocator {
+ /** {@inheritDoc} */
+ @Override
+ public long allocateMemory(long size) {
+ return GridUnsafe.allocateMemory(size);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void freeMemory(long addr) {
+ GridUnsafe.freeMemory(addr);
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeMemoryProvider.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeMemoryProvider.java
new file mode 100644
index 0000000..1b23185
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeMemoryProvider.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem.unsafe;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.pagememory.mem.MemoryAllocator;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Memory provider implementation based on unsafe memory access.
+ *
+ * <p>Supports memory reuse semantics.
+ */
+public class UnsafeMemoryProvider implements DirectMemoryProvider {
+ /** Logger.*/
+ private static final IgniteLogger LOG = IgniteLogger.forClass(UnsafeMemoryProvider.class);
+
+ /** Array with memory regions sizes. */
+ private long[] sizes;
+
+ /** List of allocated memory regions. */
+ private List<DirectMemoryRegion> regions;
+
+ /** Flag shows if current memory provider have been already initialized. */
+ private boolean isInit;
+
+ /** Number of used data regions in {@link #regions} list. */
+ private int used = 0;
+
+ /** Memory allocator. */
+ private final MemoryAllocator allocator;
+
+ /**
+ * Public constructor.
+ *
+ * @param allocator Memory allocator. If {@code null}, default {@link UnsafeMemoryAllocator} will be used.
+ */
+ public UnsafeMemoryProvider(@Nullable MemoryAllocator allocator) {
+ this.allocator = allocator == null ? new UnsafeMemoryAllocator() : allocator;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(long[] sizes) {
+ if (isInit) {
+ return;
+ }
+
+ this.sizes = sizes;
+
+ regions = new ArrayList<>();
+
+ isInit = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void shutdown(boolean deallocate) {
+ if (!deallocate) {
+ used = 0;
+
+ return;
+ }
+
+ if (regions != null) {
+ for (Iterator<DirectMemoryRegion> it = regions.iterator(); it.hasNext(); ) {
+ DirectMemoryRegion chunk = it.next();
+
+ allocator.freeMemory(chunk.address());
+
+ // Safety.
+ it.remove();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public DirectMemoryRegion nextRegion() {
+ if (used == sizes.length) {
+ return null;
+ }
+
+ if (used < regions.size()) {
+ return regions.get(used++);
+ }
+
+ long chunkSize = sizes[regions.size()];
+
+ long ptr;
+
+ try {
+ ptr = allocator.allocateMemory(chunkSize);
+ } catch (IllegalArgumentException e) {
+ String msg = "Failed to allocate next memory chunk: " + IgniteUtils.readableSize(chunkSize, true)
+ + ". Check if chunkSize is too large and 32-bit JVM is used.";
+
+ if (regions.isEmpty()) {
+ throw new IgniteInternalException(msg, e);
+ }
+
+ LOG.error(msg);
+
+ return null;
+ }
+
+ if (ptr <= 0) {
+ LOG.error("Failed to allocate next memory chunk: " + IgniteUtils.readableSize(chunkSize, true));
+
+ return null;
+ }
+
+ DirectMemoryRegion region = new UnsafeChunk(ptr, chunkSize);
+
+ regions.add(region);
+
+ used++;
+
+ return region;
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/metric/IoStatisticsHolderNoOp.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/metric/IoStatisticsHolderNoOp.java
new file mode 100644
index 0000000..3816419
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/metric/IoStatisticsHolderNoOp.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.metric;
+
+/**
+ * No Operation IO statistics holder. Use in case statistics shouldn't be gathered.
+ */
+public class IoStatisticsHolderNoOp implements IoStatisticsHolder {
+ /** No-op statistics. */
+ public static final IoStatisticsHolderNoOp INSTANCE = new IoStatisticsHolderNoOp();
+
+ /**
+ * Private constructor.
+ */
+ private IoStatisticsHolderNoOp() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void trackLogicalRead(long pageAddr) {
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void trackPhysicalAndLogicalRead(long pageAddr) {
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long logicalReads() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long physicalReads() {
+ return 0;
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
index 21f33c8..6171af1 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
@@ -45,11 +45,8 @@
public static class TestPageIo extends PageIo {
/**
* Constructor.
- *
- * @param type Page type.
- * @param ver Page format version.
*/
- protected TestPageIo() {
+ public TestPageIo() {
super(TEST_PAGE_TYPE, TEST_PAGE_VER);
}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
new file mode 100644
index 0000000..3678af3
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.impl;
+
+import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionChange;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema;
+import org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfigurationSchema;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageIdAllocator;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.TestPageIoModule.TestPageIo;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests {@link PageMemoryNoStoreImpl}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
+ protected static final int PAGE_SIZE = 8 * 1024;
+
+ private static final int MAX_MEMORY_SIZE = 10 * 1024 * 1024;
+
+ private static final PageIo PAGE_IO = new TestPageIo();
+
+ @InjectConfiguration(
+ value = "mock.type = pagemem",
+ polymorphicExtensions = {
+ PageMemoryDataRegionConfigurationSchema.class,
+ UnsafeMemoryAllocatorConfigurationSchema.class
+ })
+ private DataRegionConfiguration dataRegionCfg;
+
+ @Test
+ public void testPageTearingInner() throws Exception {
+ PageMemory mem = memory();
+
+ mem.start();
+
+ try {
+ FullPageId fullId1 = allocatePage(mem);
+ FullPageId fullId2 = allocatePage(mem);
+
+ long page1 = mem.acquirePage(fullId1.groupId(), fullId1.pageId());
+
+ try {
+ long page2 = mem.acquirePage(fullId2.groupId(), fullId2.pageId());
+
+ log.info("Allocated pages [page1Id=" + fullId1.pageId() + ", page1=" + page1
+ + ", page2Id=" + fullId2.pageId() + ", page2=" + page2 + ']');
+
+ try {
+ writePage(mem, fullId1, page1, 1);
+ writePage(mem, fullId2, page2, 2);
+
+ readPage(mem, fullId1.pageId(), page1, 1);
+ readPage(mem, fullId2.pageId(), page2, 2);
+
+ // Check read after read.
+ readPage(mem, fullId1.pageId(), page1, 1);
+ readPage(mem, fullId2.pageId(), page2, 2);
+ } finally {
+ mem.releasePage(fullId2.groupId(), fullId2.pageId(), page2);
+ }
+ } finally {
+ mem.releasePage(fullId1.groupId(), fullId1.pageId(), page1);
+ }
+ } finally {
+ mem.stop(true);
+ }
+ }
+
+ @Test
+ public void testLoadedPagesCount() throws Exception {
+ PageMemory mem = memory();
+
+ mem.start();
+
+ int expPages = MAX_MEMORY_SIZE / mem.systemPageSize();
+
+ try {
+ for (int i = 0; i < expPages * 2; i++) {
+ allocatePage(mem);
+ }
+ } catch (IgniteOutOfMemoryException e) {
+ log.error(e.getMessage(), e);
+
+ // Expected.
+ assertEquals(mem.loadedPages(), expPages);
+ } finally {
+ mem.stop(true);
+ }
+ }
+
+ @Test
+ public void testPageTearingSequential() throws Exception {
+ PageMemory mem = memory();
+
+ mem.start();
+
+ try {
+ int pagesCnt = 1024;
+
+ List<FullPageId> pages = new ArrayList<>(pagesCnt);
+
+ for (int i = 0; i < pagesCnt; i++) {
+ FullPageId fullId = allocatePage(mem);
+
+ pages.add(fullId);
+
+ long page = mem.acquirePage(fullId.groupId(), fullId.pageId());
+
+ try {
+ if (i % 64 == 0) {
+ log.info("Writing page [idx=" + i + ", pageId=" + fullId.pageId() + ", page=" + page + ']');
+ }
+
+ writePage(mem, fullId, page, i + 1);
+ } finally {
+ mem.releasePage(fullId.groupId(), fullId.pageId(), page);
+ }
+ }
+
+ for (int i = 0; i < pagesCnt; i++) {
+ FullPageId fullId = pages.get(i);
+
+ long page = mem.acquirePage(fullId.groupId(), fullId.pageId());
+
+ try {
+ if (i % 64 == 0) {
+ log.info("Reading page [idx=" + i + ", pageId=" + fullId.pageId() + ", page=" + page + ']');
+ }
+
+ readPage(mem, fullId.pageId(), page, i + 1);
+ } finally {
+ mem.releasePage(fullId.groupId(), fullId.pageId(), page);
+ }
+ }
+ } finally {
+ mem.stop(true);
+ }
+ }
+
+ @Test
+ public void testPageHandleDeallocation() throws Exception {
+ PageMemory mem = memory();
+
+ mem.start();
+
+ try {
+ int pages = 3 * 1024 * 1024 / (8 * 1024);
+
+ Collection<FullPageId> handles = new HashSet<>();
+
+ for (int i = 0; i < pages; i++) {
+ handles.add(allocatePage(mem));
+ }
+
+ for (FullPageId fullId : handles) {
+ mem.freePage(fullId.groupId(), fullId.pageId());
+ }
+
+ for (int i = 0; i < pages; i++) {
+ assertFalse(handles.add(allocatePage(mem)));
+ }
+ } finally {
+ mem.stop(true);
+ }
+ }
+
+ @Test
+ public void testPageIdRotation() throws Exception {
+ PageMemory mem = memory();
+
+ mem.start();
+
+ try {
+ int pages = 5;
+
+ Collection<FullPageId> old = new ArrayList<>();
+ Collection<FullPageId> updated = new ArrayList<>();
+
+ for (int i = 0; i < pages; i++) {
+ old.add(allocatePage(mem));
+ }
+
+ // Check that initial pages are accessible.
+ for (FullPageId id : old) {
+ long pageApsPtr = mem.acquirePage(id.groupId(), id.pageId());
+ try {
+ long pageAddr = mem.writeLock(id.groupId(), id.pageId(), pageApsPtr);
+
+ assertNotNull(pageAddr);
+
+ try {
+ PAGE_IO.initNewPage(pageAddr, id.pageId(), mem.realPageSize(id.groupId()));
+
+ long updId = PageIdUtils.rotatePageId(id.pageId());
+
+ PageIo.setPageId(pageAddr, updId);
+
+ updated.add(new FullPageId(updId, id.groupId()));
+ } finally {
+ mem.writeUnlock(id.groupId(), id.pageId(), pageApsPtr, true);
+ }
+ } finally {
+ mem.releasePage(id.groupId(), id.pageId(), pageApsPtr);
+ }
+ }
+
+ // Check that updated pages are inaccessible using old IDs.
+ for (FullPageId id : old) {
+ long pageApsPtr = mem.acquirePage(id.groupId(), id.pageId());
+ try {
+ long pageAddr = mem.writeLock(id.groupId(), id.pageId(), pageApsPtr);
+
+ if (pageAddr != 0L) {
+ mem.writeUnlock(id.groupId(), id.pageId(), pageApsPtr, false);
+
+ fail("Was able to acquire page write lock.");
+ }
+
+ mem.readLock(id.groupId(), id.pageId(), pageApsPtr);
+
+ if (pageAddr != 0) {
+ mem.readUnlock(id.groupId(), id.pageId(), pageApsPtr);
+
+ fail("Was able to acquire page read lock.");
+ }
+ } finally {
+ mem.releasePage(id.groupId(), id.pageId(), pageApsPtr);
+ }
+ }
+
+ // Check that updated pages are accessible using new IDs.
+ for (FullPageId id : updated) {
+ long pageApsPtr = mem.acquirePage(id.groupId(), id.pageId());
+ try {
+ long pageAddr = mem.writeLock(id.groupId(), id.pageId(), pageApsPtr);
+
+ assertNotSame(0L, pageAddr);
+
+ try {
+ assertEquals(id.pageId(), PageIo.getPageId(pageAddr));
+ } finally {
+ mem.writeUnlock(id.groupId(), id.pageId(), pageApsPtr, false);
+ }
+
+ pageAddr = mem.readLock(id.groupId(), id.pageId(), pageApsPtr);
+
+ assertNotSame(0L, pageAddr);
+
+ try {
+ assertEquals(id.pageId(), PageIo.getPageId(pageAddr));
+ } finally {
+ mem.readUnlock(id.groupId(), id.pageId(), pageApsPtr);
+ }
+ } finally {
+ mem.releasePage(id.groupId(), id.pageId(), pageApsPtr);
+ }
+ }
+ } finally {
+ mem.stop(true);
+ }
+ }
+
+ /**
+ * Creates new page memory instance.
+ *
+ * @return Page memory implementation.
+ * @throws Exception If failed.
+ */
+ protected PageMemory memory() throws Exception {
+ dataRegionCfg.change(cfg ->
+ cfg.convert(PageMemoryDataRegionChange.class)
+ .changePageSize(PAGE_SIZE)
+ .changeInitSize(MAX_MEMORY_SIZE)
+ .changeMaxSize(MAX_MEMORY_SIZE)
+ );
+
+ DirectMemoryProvider provider = new UnsafeMemoryProvider(null);
+
+ PageIoRegistry ioRegistry = new PageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ return new PageMemoryNoStoreImpl(
+ provider,
+ (PageMemoryDataRegionConfiguration) fixConfiguration(dataRegionCfg),
+ ioRegistry
+ );
+ }
+
+ /**
+ * Fills page with passed value.
+ *
+ * @param mem Page memory.
+ * @param fullId Page ID.
+ * @param page Page pointer.
+ * @param val Value to write.
+ */
+ private void writePage(PageMemory mem, FullPageId fullId, long page, int val) {
+ long pageAddr = mem.writeLock(-1, fullId.pageId(), page);
+
+ try {
+ PAGE_IO.initNewPage(pageAddr, fullId.pageId(), mem.realPageSize(fullId.groupId()));
+
+ for (int i = PageIo.COMMON_HEADER_END; i < PAGE_SIZE; i++) {
+ PageUtils.putByte(pageAddr, i, (byte) val);
+ }
+ } finally {
+ mem.writeUnlock(-1, fullId.pageId(), page, true);
+ }
+ }
+
+ /**
+ * Reads a page from page memory and asserts that it is full of expected values.
+ *
+ * @param mem Page memory.
+ * @param pageId Page ID.
+ * @param page Page pointer.
+ * @param expVal Expected value.
+ */
+ private void readPage(PageMemory mem, long pageId, long page, int expVal) {
+ expVal &= 0xFF;
+
+ long pageAddr = mem.readLock(-1, pageId, page);
+
+ assert pageAddr != 0;
+
+ try {
+ for (int i = PageIo.COMMON_HEADER_END; i < PAGE_SIZE; i++) {
+ int val = PageUtils.getByte(pageAddr, i) & 0xFF;
+
+ assertEquals(expVal, val, "Unexpected value at position: " + i);
+ }
+ } finally {
+ mem.readUnlock(-1, pageId, page);
+ }
+ }
+
+ /**
+ * Allocates page.
+ *
+ * @param mem Memory.
+ * @return Page.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static FullPageId allocatePage(PageIdAllocator mem) throws IgniteInternalCheckedException {
+ return new FullPageId(mem.allocatePage(-1, 1, PageIdAllocator.FLAG_DATA), -1);
+ }
+}
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
index f8c9964..0d46a52 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
@@ -30,6 +30,7 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.PartialIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
@@ -80,7 +81,10 @@
Map.of(TableValidator.class, Set.of(TableValidatorImpl.INSTANCE)),
new TestConfigurationStorage(DISTRIBUTED),
List.of(),
- List.of(HashIndexConfigurationSchema.class, SortedIndexConfigurationSchema.class, PartialIndexConfigurationSchema.class)
+ List.of(
+ HashIndexConfigurationSchema.class, SortedIndexConfigurationSchema.class, PartialIndexConfigurationSchema.class,
+ RocksDbDataRegionConfigurationSchema.class
+ )
);
confRegistry.start();
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java
index 62fc4d3..07522d5 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java
@@ -34,6 +34,9 @@
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
import org.apache.ignite.configuration.schemas.store.DataStorageView;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
+import org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.HashIndexChange;
import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.PartialIndexConfigurationSchema;
@@ -72,7 +75,9 @@
/** Tests that validator finds no issues in a simple valid configuration. */
@Test
- public void testNoIssues(@InjectConfiguration DataStorageConfiguration dbCfg) {
+ public void testNoIssues(
+ @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataStorageConfiguration dbCfg
+ ) {
ValidationContext<NamedListView<TableView>> ctx = mockContext(null, dbCfg.value());
ArgumentCaptor<ValidationIssue> issuesCaptor = validate(ctx);
@@ -82,7 +87,9 @@
/** Tests that the validator catches nonexistent data regions. */
@Test
- public void testMissingDataRegion(@InjectConfiguration DataStorageConfiguration dbCfg) throws Exception {
+ public void testMissingDataRegion(
+ @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataStorageConfiguration dbCfg
+ ) throws Exception {
tablesCfg.tables().get("table").dataRegion().update("r0").get(1, TimeUnit.SECONDS);
ValidationContext<NamedListView<TableView>> ctx = mockContext(null, dbCfg.value());
@@ -100,7 +107,13 @@
/** Tests that new data region must have the same type. */
@Test
public void testChangeDataRegionType(
- @InjectConfiguration("mock.regions.r0.type = foo") DataStorageConfiguration dbCfg
+ @InjectConfiguration(
+ value = "mock.regions.r0.type = pagemem",
+ polymorphicExtensions = {
+ RocksDbDataRegionConfigurationSchema.class, PageMemoryDataRegionConfigurationSchema.class,
+ UnsafeMemoryAllocatorConfigurationSchema.class
+ })
+ DataStorageConfiguration dbCfg
) throws Exception {
NamedListView<TableView> oldValue = tablesCfg.tables().value();
@@ -114,7 +127,7 @@
assertEquals(
"Unable to move table 'schema.table' from region 'default' to region 'r0' because it has"
- + " different type (old=rocksdb, new=foo)",
+ + " different type (old=rocksdb, new=pagemem)",
issuesCaptor.getValue().message()
);
}
@@ -123,7 +136,9 @@
* Tests that column names and column keys inside a Named List must be equal.
*/
@Test
- void testMisalignedColumnNamedListKeys(@InjectConfiguration DataStorageConfiguration dbCfg) {
+ void testMisalignedColumnNamedListKeys(
+ @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataStorageConfiguration dbCfg
+ ) {
NamedListView<TableView> oldValue = tablesCfg.tables().value();
TableConfiguration tableCfg = tablesCfg.tables().get("table");
@@ -153,7 +168,9 @@
* Tests that index names and index keys inside a Named List must be equal.
*/
@Test
- void testMisalignedIndexNamedListKeys(@InjectConfiguration DataStorageConfiguration dbCfg) {
+ void testMisalignedIndexNamedListKeys(
+ @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataStorageConfiguration dbCfg
+ ) {
NamedListView<TableView> oldValue = tablesCfg.tables().value();
TableConfiguration tableCfg = tablesCfg.tables().get("table");
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlWithMockedManagersTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlWithMockedManagersTest.java
index 8b29299..b558c62 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlWithMockedManagersTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlWithMockedManagersTest.java
@@ -31,6 +31,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.PartialIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
@@ -112,7 +113,7 @@
private TablesConfiguration tblsCfg;
/** Data storage configuration. */
- @InjectConfiguration
+ @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class)
private DataStorageConfiguration dataStorageCfg;
TableManager tblManager;
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
index b6a1dc3..de246af 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
@@ -17,13 +17,13 @@
package org.apache.ignite.internal.storage.rocksdb;
-import static org.apache.ignite.configuration.schemas.store.DataRegionConfigurationSchema.ROCKSDB_CLOCK_CACHE;
-import static org.apache.ignite.configuration.schemas.store.DataRegionConfigurationSchema.ROCKSDB_DATA_REGION_TYPE;
-import static org.apache.ignite.configuration.schemas.store.DataRegionConfigurationSchema.ROCKSDB_LRU_CACHE;
+import static org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema.ROCKSDB_CLOCK_CACHE;
+import static org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema.ROCKSDB_DATA_REGION_TYPE;
+import static org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema.ROCKSDB_LRU_CACHE;
import java.util.Locale;
-import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
-import org.apache.ignite.configuration.schemas.store.DataRegionView;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionView;
import org.apache.ignite.internal.storage.engine.DataRegion;
import org.apache.ignite.internal.util.IgniteUtils;
import org.rocksdb.Cache;
@@ -36,7 +36,7 @@
*/
public class RocksDbDataRegion implements DataRegion {
/** Region configuration. */
- private final DataRegionConfiguration cfg;
+ private final RocksDbDataRegionConfiguration cfg;
/** RocksDB cache instance. */
private Cache cache;
@@ -49,7 +49,7 @@
*
* @param cfg Data region configuration.
*/
- public RocksDbDataRegion(DataRegionConfiguration cfg) {
+ public RocksDbDataRegion(RocksDbDataRegionConfiguration cfg) {
this.cfg = cfg;
assert ROCKSDB_DATA_REGION_TYPE.equalsIgnoreCase(cfg.type().value());
@@ -58,7 +58,7 @@
/** {@inheritDoc} */
@Override
public void start() {
- DataRegionView dataRegionView = cfg.value();
+ RocksDbDataRegionView dataRegionView = (RocksDbDataRegionView) cfg.value();
long writeBufferSize = dataRegionView.writeBufferSize();
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index 942ad53..56acd89 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -22,6 +22,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfiguration;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.DataRegion;
@@ -59,7 +60,9 @@
/** {@inheritDoc} */
@Override
public DataRegion createDataRegion(DataRegionConfiguration regionCfg) {
- return new RocksDbDataRegion(regionCfg);
+ assert regionCfg instanceof RocksDbDataRegionConfiguration : regionCfg;
+
+ return new RocksDbDataRegion((RocksDbDataRegionConfiguration) regionCfg);
}
/** {@inheritDoc} */
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorageTest.java
index a91e5e6..c55784a 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorageTest.java
@@ -17,12 +17,15 @@
package org.apache.ignite.internal.storage.rocksdb;
+import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import java.nio.file.Path;
import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionChange;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -56,10 +59,14 @@
@BeforeEach
public void setUp(
@WorkDirectory Path workDir,
- @InjectConfiguration DataRegionConfiguration dataRegionCfg,
+ @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataRegionConfiguration dataRegionCfg,
@InjectConfiguration(polymorphicExtensions = HashIndexConfigurationSchema.class) TableConfiguration tableCfg
) throws Exception {
- dataRegionCfg.change(cfg -> cfg.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024)).get();
+ dataRegionCfg.change(cfg ->
+ cfg.convert(RocksDbDataRegionChange.class).changeSize(16 * 1024).changeWriteBufferSize(16 * 1024)
+ ).get();
+
+ dataRegionCfg = fixConfiguration(dataRegionCfg);
dataRegion = engine.createDataRegion(dataRegionCfg);
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
index dea4a40..31d0be6 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.storage.rocksdb;
+import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -32,6 +33,8 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionChange;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -67,10 +70,12 @@
@BeforeEach
public void setUp(
- @InjectConfiguration DataRegionConfiguration dataRegionCfg,
+ @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataRegionConfiguration dataRegionCfg,
@InjectConfiguration(polymorphicExtensions = HashIndexConfigurationSchema.class) TableConfiguration tableCfg
) throws Exception {
- CompletableFuture<Void> changeFuture = dataRegionCfg.change(cfg -> cfg.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024));
+ CompletableFuture<Void> changeFuture = dataRegionCfg.change(cfg ->
+ cfg.convert(RocksDbDataRegionChange.class).changeSize(16 * 1024).changeWriteBufferSize(16 * 1024)
+ );
assertThat(changeFuture, willBe(nullValue(Void.class)));
@@ -78,6 +83,8 @@
assertThat(changeFuture, willBe(nullValue(Void.class)));
+ dataRegionCfg = fixConfiguration(dataRegionCfg);
+
dataRegion = engine.createDataRegion(dataRegionCfg);
assertThat(dataRegion, is(instanceOf(RocksDbDataRegion.class)));
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
index 140c3df..768e189 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
@@ -19,6 +19,7 @@
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
import static org.apache.ignite.internal.schema.SchemaTestUtils.generateRandomValue;
import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
@@ -47,6 +48,8 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionChange;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
@@ -116,7 +119,10 @@
private final List<AutoCloseable> resources = new ArrayList<>();
@BeforeEach
- void setUp(@WorkDirectory Path workDir, @InjectConfiguration DataRegionConfiguration dataRegionCfg) {
+ void setUp(
+ @WorkDirectory Path workDir,
+ @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataRegionConfiguration dataRegionCfg
+ ) {
long seed = System.currentTimeMillis();
log.info("Using random seed: " + seed);
@@ -125,6 +131,8 @@
createTestConfiguration(dataRegionCfg);
+ dataRegionCfg = fixConfiguration(dataRegionCfg);
+
var engine = new RocksDbStorageEngine();
engine.start();
@@ -152,7 +160,7 @@
*/
private void createTestConfiguration(DataRegionConfiguration dataRegionCfg) {
CompletableFuture<Void> dataRegionChangeFuture = dataRegionCfg
- .change(cfg -> cfg.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024));
+ .change(cfg -> cfg.convert(RocksDbDataRegionChange.class).changeSize(16 * 1024).changeWriteBufferSize(16 * 1024));
assertThat(dataRegionChangeFuture, willBe(nullValue(Void.class)));
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index 095c25a..96c8e54 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -43,6 +43,7 @@
import java.util.concurrent.Phaser;
import java.util.function.Consumer;
import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.PartialIndexConfigurationSchema;
import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
@@ -140,7 +141,7 @@
private TablesConfiguration tblsCfg;
/** Data storage configuration. */
- @InjectConfiguration
+ @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class)
private DataStorageConfiguration dataStorageCfg;
/** Test node. */