blob: b9420f6acf90e70b6370f727631477344b205f19 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.NodeMode.CLIENT;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.NodeMode.SERVER;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.TxEndResult.COMMIT;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.TxEndResult.ROLLBAK;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
/** */
public class CacheMvccTxRecoveryTest extends CacheMvccAbstractTest {
/** */
public enum TxEndResult {
/** */
COMMIT,
/** */
ROLLBAK
}
/** */
public enum NodeMode {
/** */
SERVER,
/** */
CLIENT
}
/** {@inheritDoc} */
@Override protected CacheMode cacheMode() {
throw new RuntimeException("Is not supposed to be used");
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
return cfg;
}
/**
* @throws Exception if failed.
*/
@Test
public void testRecoveryCommitNearFailure1() throws Exception {
checkRecoveryNearFailure(COMMIT, CLIENT);
}
/**
* @throws Exception if failed.
*/
@Test
public void testRecoveryCommitNearFailure2() throws Exception {
checkRecoveryNearFailure(COMMIT, SERVER);
}
/**
* @throws Exception if failed.
*/
@Test
public void testRecoveryRollbackNearFailure1() throws Exception {
checkRecoveryNearFailure(ROLLBAK, CLIENT);
}
/**
* @throws Exception if failed.
*/
@Test
public void testRecoveryRollbackNearFailure2() throws Exception {
checkRecoveryNearFailure(ROLLBAK, SERVER);
}
/**
* @throws Exception if failed.
*/
@Test
public void testRecoveryCommitPrimaryFailure1() throws Exception {
checkRecoveryPrimaryFailure(COMMIT, false);
}
/**
* @throws Exception if failed.
*/
@Test
public void testRecoveryRollbackPrimaryFailure1() throws Exception {
checkRecoveryPrimaryFailure(ROLLBAK, false);
}
/**
* @throws Exception if failed.
*/
@Test
public void testRecoveryCommitPrimaryFailure2() throws Exception {
checkRecoveryPrimaryFailure(COMMIT, true);
}
/**
* @throws Exception if failed.
*/
@Test
public void testRecoveryRollbackPrimaryFailure2() throws Exception {
checkRecoveryPrimaryFailure(ROLLBAK, true);
}
/** */
private void checkRecoveryNearFailure(TxEndResult endRes, NodeMode nearNodeMode) throws Exception {
int gridCnt = 4;
int baseCnt = gridCnt - 1;
boolean commit = endRes == COMMIT;
startGridsMultiThreaded(baseCnt);
// tweak client/server near
client = nearNodeMode == CLIENT;
IgniteEx nearNode = startGrid(baseCnt);
IgniteCache<Object, Object> cache = nearNode.getOrCreateCache(basicCcfg()
.setBackups(1));
Affinity<Object> aff = nearNode.affinity(DEFAULT_CACHE_NAME);
List<Integer> keys = new ArrayList<>();
for (int i = 0; i < 100; i++) {
if (aff.isPrimary(grid(0).localNode(), i) && aff.isBackup(grid(1).localNode(), i)) {
keys.add(i);
break;
}
}
for (int i = 0; i < 100; i++) {
if (aff.isPrimary(grid(1).localNode(), i) && aff.isBackup(grid(2).localNode(), i)) {
keys.add(i);
break;
}
}
assert keys.size() == 2;
TestRecordingCommunicationSpi nearComm
= (TestRecordingCommunicationSpi)nearNode.configuration().getCommunicationSpi();
if (!commit)
nearComm.blockMessages(GridNearTxPrepareRequest.class, grid(1).name());
GridTestUtils.runAsync(() -> {
// run in separate thread to exclude tx from thread-local map
GridNearTxLocal nearTx
= ((TransactionProxyImpl)nearNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx();
for (Integer k : keys)
cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k));
List<IgniteInternalTx> txs = IntStream.range(0, baseCnt)
.mapToObj(i -> txsOnNode(grid(i), nearTx.xidVersion()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
IgniteInternalFuture<?> prepareFut = nearTx.prepareNearTxLocal();
if (commit)
prepareFut.get();
else
assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == PREPARED));
// drop near
nearNode.close();
assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == (commit ? COMMITTED : ROLLED_BACK)));
return null;
}).get();
if (commit) {
assertConditionEventually(() -> {
int rowsCnt = grid(0).cache(DEFAULT_CACHE_NAME)
.query(new SqlFieldsQuery("select * from Integer")).getAll().size();
return rowsCnt == keys.size();
});
}
else {
int rowsCnt = G.allGrids().get(0).cache(DEFAULT_CACHE_NAME)
.query(new SqlFieldsQuery("select * from Integer")).getAll().size();
assertEquals(0, rowsCnt);
}
assertPartitionCountersAreConsistent(keys, grids(baseCnt, i -> true));
}
/** */
private void checkRecoveryPrimaryFailure(TxEndResult endRes, boolean mvccCrd) throws Exception {
int gridCnt = 4;
int baseCnt = gridCnt - 1;
boolean commit = endRes == COMMIT;
startGridsMultiThreaded(baseCnt);
client = true;
IgniteEx nearNode = startGrid(baseCnt);
IgniteCache<Object, Object> cache = nearNode.getOrCreateCache(basicCcfg()
.setBackups(1));
Affinity<Object> aff = nearNode.affinity(DEFAULT_CACHE_NAME);
List<Integer> keys = new ArrayList<>();
for (int i = 0; i < 100; i++) {
if (aff.isPrimary(grid(0).localNode(), i) && aff.isBackup(grid(1).localNode(), i)) {
keys.add(i);
break;
}
}
for (int i = 0; i < 100; i++) {
if (aff.isPrimary(grid(1).localNode(), i) && aff.isBackup(grid(2).localNode(), i)) {
keys.add(i);
break;
}
}
assert keys.size() == 2;
int victim, victimBackup;
if (mvccCrd) {
victim = 0;
victimBackup = 1;
}
else {
victim = 1;
victimBackup = 2;
}
TestRecordingCommunicationSpi victimComm = (TestRecordingCommunicationSpi)grid(victim).configuration().getCommunicationSpi();
if (commit)
victimComm.blockMessages(GridNearTxFinishResponse.class, nearNode.name());
else
victimComm.blockMessages(GridDhtTxPrepareRequest.class, grid(victimBackup).name());
GridNearTxLocal nearTx
= ((TransactionProxyImpl)nearNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx();
for (Integer k : keys)
cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k));
List<IgniteInternalTx> txs = IntStream.range(0, baseCnt)
.filter(i -> i != victim)
.mapToObj(i -> txsOnNode(grid(i), nearTx.xidVersion()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
IgniteInternalFuture<IgniteInternalTx> commitFut = nearTx.commitAsync();
if (commit)
assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == COMMITTED));
else
assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == PREPARED));
// drop victim
grid(victim).close();
awaitPartitionMapExchange();
assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == (commit ? COMMITTED : ROLLED_BACK)));
assert victimComm.hasBlockedMessages();
if (commit) {
assertConditionEventually(() -> {
int rowsCnt = G.allGrids().get(0).cache(DEFAULT_CACHE_NAME)
.query(new SqlFieldsQuery("select * from Integer")).getAll().size();
return rowsCnt == keys.size();
});
}
else {
int rowsCnt = G.allGrids().get(0).cache(DEFAULT_CACHE_NAME)
.query(new SqlFieldsQuery("select * from Integer")).getAll().size();
assertEquals(0, rowsCnt);
}
assertTrue(commitFut.isDone());
assertPartitionCountersAreConsistent(keys, grids(baseCnt, i -> i != victim));
}
/**
* @throws Exception if failed.
*/
@Test
public void testRecoveryCommit() throws Exception {
startGridsMultiThreaded(2);
client = true;
IgniteEx ign = startGrid(2);
IgniteCache<Object, Object> cache = ign.getOrCreateCache(basicCcfg());
AtomicInteger keyCntr = new AtomicInteger();
ArrayList<Integer> keys = new ArrayList<>();
ign.cluster().forServers().nodes()
.forEach(node -> keys.add(keyForNode(ign.affinity(DEFAULT_CACHE_NAME), keyCntr, node)));
GridTestUtils.runAsync(() -> {
// run in separate thread to exclude tx from thread-local map
Transaction tx = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
for (Integer k : keys)
cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k));
((TransactionProxyImpl)tx).tx().prepareNearTxLocal().get();
return null;
}).get();
// drop near
stopGrid(2, true);
IgniteEx srvNode = grid(0);
assertConditionEventually(
() -> srvNode.cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("select * from Integer")).getAll().size() == 2
);
assertPartitionCountersAreConsistent(keys, G.allGrids());
}
/**
* @throws Exception if failed.
*/
@Test
public void testCountersNeighborcastServerFailed() throws Exception {
// Reopen https://issues.apache.org/jira/browse/IGNITE-10766 if starts failing
int srvCnt = 4;
startGridsMultiThreaded(srvCnt);
client = true;
IgniteEx ign = startGrid(srvCnt);
IgniteCache<Object, Object> cache = ign.getOrCreateCache(basicCcfg()
.setBackups(2));
ArrayList<Integer> keys = new ArrayList<>();
int vid = 3;
IgniteEx victim = grid(vid);
Affinity<Object> aff = ign.affinity(DEFAULT_CACHE_NAME);
for (int i = 0; i < 100; i++) {
if (aff.isPrimary(victim.localNode(), i) && !aff.isBackup(grid(0).localNode(), i)) {
keys.add(i);
break;
}
}
for (int i = 0; i < 100; i++) {
if (aff.isPrimary(victim.localNode(), i) && !aff.isBackup(grid(1).localNode(), i)) {
keys.add(i);
break;
}
}
assert keys.size() == 2 && !keys.contains(99);
// prevent prepare on one backup
((TestRecordingCommunicationSpi)victim.configuration().getCommunicationSpi())
.blockMessages(GridDhtTxPrepareRequest.class, grid(0).name());
GridNearTxLocal nearTx = ((TransactionProxyImpl)ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx();
for (Integer k : keys)
cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k));
List<IgniteInternalTx> txs = IntStream.range(0, srvCnt)
.mapToObj(this::grid)
.filter(g -> g != victim)
.map(g -> txsOnNode(g, nearTx.xidVersion()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
nearTx.commitAsync();
// await tx partially prepared
assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == PREPARED));
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
IgniteInternalFuture<Object> backgroundTxFut = GridTestUtils.runAsync(() -> {
try (Transaction ignored = ign.transactions().txStart()) {
boolean upd = false;
for (int i = 100; i < 200; i++) {
if (!aff.isPrimary(victim.localNode(), i)) {
cache.put(i, 11);
upd = true;
break;
}
}
assert upd;
latch1.countDown();
latch2.await(getTestTimeout(), TimeUnit.MILLISECONDS);
}
return null;
});
latch1.await(getTestTimeout(), TimeUnit.MILLISECONDS);
// drop primary
victim.close();
// do all assertions before rebalance
assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == ROLLED_BACK));
List<IgniteEx> liveNodes = grids(srvCnt, i -> i != vid);
assertPartitionCountersAreConsistent(keys, liveNodes);
latch2.countDown();
backgroundTxFut.get(getTestTimeout());
assertTrue(liveNodes.stream()
.map(node -> node.cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("select * from Integer")).getAll())
.allMatch(Collection::isEmpty));
}
/**
* @throws Exception if failed.
*/
@Test
public void testTxRecoveryWithLostFullMessageOnJoiningBackupNode() throws Exception {
CountDownLatch success = new CountDownLatch(1);
int joiningBackupNodeId = 2;
IgniteEx crd = startGrid(0);
IgniteEx partOwner = startGrid(1);
IgniteCache<Object, Object> cache = partOwner.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setAtomicityMode(TRANSACTIONAL)
.setCacheMode(PARTITIONED)
.setIndexedTypes(Integer.class, Integer.class)
.setBackups(2));
// prevent FullMassage on joining backup node
((TestRecordingCommunicationSpi)crd.configuration().getCommunicationSpi())
.blockMessages(GridDhtPartitionsFullMessage.class, getTestIgniteInstanceName(joiningBackupNodeId));
new Thread(() -> {
try {
startGrid(joiningBackupNodeId);
}
catch (Exception e) {
e.printStackTrace();
}
success.countDown();
}).start();
assertTrue(GridTestUtils.waitForCondition(() -> crd.cluster().nodes().size() == 3, 10_000));
ArrayList<Integer> keys = new ArrayList<>();
Affinity<Object> aff = crd.affinity(DEFAULT_CACHE_NAME);
for (int i = 0; i < 100; i++) {
if (aff.isPrimary(partOwner.localNode(), i)) {
keys.add(i);
break;
}
}
((TestRecordingCommunicationSpi)partOwner.configuration().getCommunicationSpi())
.blockMessages(GridDhtTxPrepareRequest.class, getTestIgniteInstanceName(joiningBackupNodeId));
GridNearTxLocal nearTx = ((TransactionProxyImpl)partOwner.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx();
for (Integer k : keys)
cache.put(k, k);
nearTx.commitAsync();
// Checks that PartitionCountersNeighborcastRequest will be sent after primary node left.
IgniteTxManager tm = crd.context().cache().context().tm();
assertTrue(GridTestUtils.waitForCondition(() -> !tm.activeTransactions().isEmpty(), 10_000));
assertTrue(GridTestUtils.waitForCondition(() -> tm.activeTransactions().iterator().next().state().equals(PREPARED), 10_000));
// Primary node left.
partOwner.close();
// Node with backup fetch lost FullMessage and starts.
((TestRecordingCommunicationSpi)crd.configuration().getCommunicationSpi()).stopBlock();
success.await();
awaitPartitionMapExchange();
assertEquals(2, crd.cluster().nodes().size());
}
/**
* @throws Exception if failed.
*/
@Test
public void testUpdateCountersGapIsClosed() throws Exception {
int srvCnt = 3;
startGridsMultiThreaded(srvCnt);
client = true;
IgniteEx ign = startGrid(srvCnt);
IgniteCache<Object, Object> cache = ign.getOrCreateCache(
basicCcfg().setBackups(2));
int vid = 1;
IgniteEx victim = grid(vid);
ArrayList<Integer> keys = new ArrayList<>();
Integer part = null;
Affinity<Object> aff = ign.affinity(DEFAULT_CACHE_NAME);
for (int i = 0; i < 2000; i++) {
int p = aff.partition(i);
if (aff.isPrimary(victim.localNode(), i)) {
if (part == null) part = p;
if (p == part) keys.add(i);
if (keys.size() == 2) break;
}
}
assert keys.size() == 2;
Transaction txA = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
// prevent first transaction prepare on backups
((TestRecordingCommunicationSpi)victim.configuration().getCommunicationSpi())
.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
final AtomicInteger limiter = new AtomicInteger();
@Override public boolean apply(ClusterNode node, Message msg) {
if (msg instanceof GridDhtTxPrepareRequest)
return limiter.getAndIncrement() < 2;
return false;
}
});
cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(0)));
txA.commitAsync();
GridCacheVersion aXidVer = ((TransactionProxyImpl)txA).tx().xidVersion();
assertConditionEventually(() -> txsOnNode(victim, aXidVer).stream()
.anyMatch(tx -> tx.state() == PREPARING));
GridTestUtils.runAsync(() -> {
try (Transaction txB = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(1)));
txB.commit();
}
}).get();
long victimUpdCntr = updateCounter(victim.cachex(DEFAULT_CACHE_NAME).context(), keys.get(0));
List<IgniteEx> backupNodes = grids(srvCnt, i -> i != vid);
List<IgniteInternalTx> backupTxsA = backupNodes.stream()
.map(node -> txsOnNode(node, aXidVer))
.flatMap(Collection::stream)
.collect(Collectors.toList());
// drop primary
victim.close();
assertConditionEventually(() -> backupTxsA.stream().allMatch(tx -> tx.state() == ROLLED_BACK));
backupNodes.stream()
.map(node -> node.cache(DEFAULT_CACHE_NAME))
.forEach(c -> {
assertEquals(1, c.query(new SqlFieldsQuery("select * from Integer")).getAll().size());
});
backupNodes.forEach(node -> {
for (Integer k : keys)
assertEquals(victimUpdCntr, updateCounter(node.cachex(DEFAULT_CACHE_NAME).context(), k));
});
}
/** */
private static CacheConfiguration<Object, Object> basicCcfg() {
return new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setAtomicityMode(TRANSACTIONAL_SNAPSHOT)
.setCacheMode(PARTITIONED)
.setIndexedTypes(Integer.class, Integer.class);
}
/** */
private static List<IgniteInternalTx> txsOnNode(IgniteEx node, GridCacheVersion xidVer) {
List<IgniteInternalTx> txs = node.context().cache().context().tm().activeTransactions().stream()
.peek(tx -> assertEquals(xidVer, tx.nearXidVersion()))
.collect(Collectors.toList());
assert !txs.isEmpty();
return txs;
}
/** */
private static void assertConditionEventually(GridAbsPredicate p)
throws IgniteInterruptedCheckedException {
if (!GridTestUtils.waitForCondition(p, 5_000))
fail();
}
/** */
private List<IgniteEx> grids(int cnt, IntPredicate p) {
return IntStream.range(0, cnt).filter(p).mapToObj(this::grid).collect(Collectors.toList());
}
/** */
private void assertPartitionCountersAreConsistent(Iterable<Integer> keys, Iterable<? extends Ignite> nodes) {
for (Integer key : keys) {
long cntr0 = -1;
for (Ignite n : nodes) {
IgniteEx node = ((IgniteEx)n);
if (node.affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(node.localNode(), key)) {
long cntr = updateCounter(node.cachex(DEFAULT_CACHE_NAME).context(), key);
// System.err.println(node.localNode().consistentId() + " " + key + " -> " + cntr);
if (cntr0 == -1)
cntr0 = cntr;
assertEquals(cntr0, cntr);
}
}
}
}
/** */
private static long updateCounter(GridCacheContext<?, ?> cctx, Object key) {
return dataStore(cctx, key)
.map(IgniteCacheOffheapManager.CacheDataStore::updateCounter)
.get();
}
/** */
private static Optional<IgniteCacheOffheapManager.CacheDataStore> dataStore(
GridCacheContext<?, ?> cctx, Object key) {
int p = cctx.affinity().partition(key);
IgniteCacheOffheapManager offheap = cctx.offheap();
return StreamSupport.stream(offheap.cacheDataStores().spliterator(), false)
.filter(ds -> ds.partId() == p)
.findFirst();
}
}