blob: e57aeb3e502f9dab816a7f67b3136742c11e8509 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.spi.IgniteSpiCloseableIterator;
import org.junit.Test;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CachePeekMode.ALL;
import static org.apache.ignite.cache.CachePeekMode.BACKUP;
import static org.apache.ignite.cache.CachePeekMode.NEAR;
import static org.apache.ignite.cache.CachePeekMode.OFFHEAP;
import static org.apache.ignite.cache.CachePeekMode.ONHEAP;
import static org.apache.ignite.cache.CachePeekMode.PRIMARY;
/**
* Tests for methods using {@link CachePeekMode}:
* <ul>
* <li>{@link IgniteCache#localPeek(Object, CachePeekMode...)}</li>
* <li>{@link IgniteCache#localSize(CachePeekMode...)}</li>
* <li>{@link IgniteCache#size(CachePeekMode...)}</li>
* <li>{@link IgniteCache#localEntries(CachePeekMode...)}</li>
* </ul>
*/
public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstractTest {
/** */
private static final int HEAP_ENTRIES = 30;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
return cfg;
}
/** {@inheritDoc} */
@Override protected NearCacheConfiguration nearConfiguration() {
return null;
}
/**
* @return Has near cache flag.
*/
protected boolean hasNearCache() {
return false;
}
/** {@inheritDoc} */
@Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
CacheConfiguration ccfg = super.cacheConfiguration(igniteInstanceName);
ccfg.setBackups(1);
if (hasNearCache())
ccfg.setNearConfiguration(new NearCacheConfiguration());
FifoEvictionPolicy plc = new FifoEvictionPolicy();
plc.setMaxSize(HEAP_ENTRIES);
ccfg.setEvictionPolicy(plc);
ccfg.setOnheapCacheEnabled(true);
return ccfg;
}
/**
* @throws Exception If failed.
*/
@Test
public void testLocalPeek() throws Exception {
checkAffinityPeek(0);
checkAffinityPeek(1);
checkStorage(0);
checkStorage(1);
}
/**
* @param nodeIdx Node index.
* @throws Exception If failed.
*/
private void checkAffinityPeek(int nodeIdx) throws Exception {
IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
final String val = "1";
Integer key = null;
try {
if (cacheMode() == REPLICATED) {
key = backupKey(cache0);
cache0.put(key, val);
assertEquals(val, cache0.localPeek(key, ALL));
assertEquals(val, cache0.localPeek(key, BACKUP));
assertNull(cache0.localPeek(key, NEAR));
assertNull(cache0.localPeek(key, PRIMARY));
}
else {
key = nearKey(cache0);
cache0.put(key, val);
if (hasNearCache()) {
assertEquals(val, cache0.localPeek(key, NEAR));
assertEquals(val, cache0.localPeek(key, ALL));
}
else {
assertNull(cache0.localPeek(key, NEAR));
assertNull(cache0.localPeek(key, ALL));
}
assertNull(cache0.localPeek(key, PRIMARY));
assertNull(cache0.localPeek(key, BACKUP));
}
Affinity<Integer> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
for (int i = 0; i < gridCount(); i++) {
if (i == nodeIdx)
continue;
IgniteCache<Integer, String> cache = jcache(i);
assertNull(cache.localPeek(key, NEAR));
if (aff.isPrimary(ignite(i).cluster().localNode(), key)) {
assertEquals(val, cache.localPeek(key, PRIMARY));
assertEquals(val, cache.localPeek(key, ALL));
assertNull(cache.localPeek(key, BACKUP));
assertNull(cache.localPeek(key, NEAR));
}
else if (aff.isBackup(ignite(i).cluster().localNode(), key)) {
assertEquals(val, cache.localPeek(key, BACKUP));
assertEquals(val, cache.localPeek(key, ALL));
assertNull(cache.localPeek(key, PRIMARY));
assertNull(cache.localPeek(key, NEAR));
}
else {
assertNull(cache.localPeek(key, ALL));
assertNull(cache.localPeek(key, PRIMARY));
assertNull(cache.localPeek(key, BACKUP));
assertNull(cache.localPeek(key, NEAR));
}
}
}
finally {
if (key != null)
cache0.remove(key);
}
}
/**
* @param nodeIdx Node index.
* @throws Exception If failed.
*/
private void checkStorage(int nodeIdx) throws Exception {
if (true) // TODO GG-11148.
return;
IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
List<Integer> keys = primaryKeys(cache0, 100, 10_000);
try {
final String val = "test_value";
for (Integer key : keys)
cache0.put(key, val);
Ignite ignite = ignite(nodeIdx);
GridCacheAdapter<Integer, String> internalCache =
((IgniteKernal)ignite).context().cache().internalCache(DEFAULT_CACHE_NAME);
CacheObjectContext coctx = internalCache.context().cacheObjectContext();
Set<Integer> swapKeys = new HashSet<>();
// TODO GG-11148.
// SwapSpaceSpi swap = ignite.configuration().getSwapSpaceSpi();
//
// IgniteSpiCloseableIterator<KeyCacheObject> it = swap.keyIterator(SPACE_NAME, null);
IgniteSpiCloseableIterator<KeyCacheObject> it = new GridEmptyCloseableIterator<>();
assertNotNull(it);
while (it.hasNext()) {
KeyCacheObject next = it.next();
assertTrue(swapKeys.add((Integer)next.value(coctx, false)));
}
assertFalse(swapKeys.isEmpty());
assertTrue(swapKeys.size() + HEAP_ENTRIES < 100);
Set<Integer> offheapKeys = new HashSet<>();
// TODO GG-11148.
Iterator<Map.Entry<Integer, String>> offheapIt = Collections.EMPTY_MAP.entrySet().iterator();
// if (internalCache.context().isNear())
// offheapIt = internalCache.context().near().dht().context().swap().lazyOffHeapIterator(false);
// else
// offheapIt = internalCache.context().swap().lazyOffHeapIterator(false);
while (offheapIt.hasNext()) {
Map.Entry<Integer, String> e = offheapIt.next();
assertTrue(offheapKeys.add(e.getKey()));
assertFalse(swapKeys.contains(e.getKey()));
}
assertFalse(offheapKeys.isEmpty());
Set<Integer> heapKeys = new HashSet<>(keys);
heapKeys.removeAll(offheapKeys);
heapKeys.removeAll(swapKeys);
assertFalse(heapKeys.isEmpty());
log.info("Keys [swap=" + swapKeys.size() +
", offheap=" + offheapKeys.size() +
", heap=" + heapKeys.size() + ']');
assertEquals(100, swapKeys.size() + offheapKeys.size() + heapKeys.size());
for (Integer key : swapKeys) {
assertEquals(val, cache0.localPeek(key));
assertEquals(val, cache0.localPeek(key, PRIMARY));
assertEquals(val, cache0.localPeek(key, ONHEAP));
assertEquals(val, cache0.localPeek(key, ONHEAP, OFFHEAP));
assertEquals(val, cache0.localPeek(key, PRIMARY, ONHEAP));
assertEquals(val, cache0.localPeek(key, PRIMARY, ONHEAP, OFFHEAP));
assertNull(cache0.localPeek(key, BACKUP));
assertNull(cache0.localPeek(key, NEAR));
assertNull(cache0.localPeek(key, ONHEAP));
assertNull(cache0.localPeek(key, OFFHEAP));
}
for (Integer key : offheapKeys) {
assertEquals(val, cache0.localPeek(key, OFFHEAP));
assertEquals(val, cache0.localPeek(key, ONHEAP, OFFHEAP));
assertEquals(val, cache0.localPeek(key, ONHEAP, OFFHEAP));
assertEquals(val, cache0.localPeek(key, PRIMARY, OFFHEAP));
assertNull(cache0.localPeek(key, OFFHEAP, BACKUP));
assertNull(cache0.localPeek(key, OFFHEAP, NEAR));
assertNull(cache0.localPeek(key, ONHEAP));
assertNull(cache0.localPeek(key));
}
for (Integer key : heapKeys) {
assertEquals(val, cache0.localPeek(key, ONHEAP));
assertEquals(val, cache0.localPeek(key, ONHEAP));
assertEquals(val, cache0.localPeek(key, OFFHEAP, ONHEAP));
assertEquals(val, cache0.localPeek(key, PRIMARY, ONHEAP));
assertNull(cache0.localPeek(key, ONHEAP, BACKUP));
assertNull(cache0.localPeek(key, ONHEAP, NEAR));
assertNull(cache0.localPeek(key));
assertNull(cache0.localPeek(key, OFFHEAP));
}
}
finally {
cache0.removeAll(new HashSet<>(keys));
}
}
/**
* @param nodeIdx Node index.
* @throws Exception If failed.
*/
private void checkSizeAffinityFilter(int nodeIdx) throws Exception {
IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
final int PUT_KEYS = 10;
List<Integer> keys = null;
try {
if (cacheMode() == REPLICATED) {
keys = backupKeys(cache0, 10, 0);
for (Integer key : keys)
cache0.put(key, String.valueOf(key));
assertEquals(PUT_KEYS, cache0.localSize(BACKUP));
assertEquals(PUT_KEYS, cache0.localSize(ALL));
assertEquals(0, cache0.localSize());
assertEquals(0, cache0.localSize(PRIMARY));
assertEquals(0, cache0.localSize(NEAR));
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Integer, String> cache = jcache(i);
assertEquals(0, cache.size(NEAR));
assertEquals(PUT_KEYS, cache.size(PRIMARY));
assertEquals(PUT_KEYS * (gridCount() - 1), cache.size(BACKUP));
assertEquals(PUT_KEYS * gridCount(), cache.size(PRIMARY, BACKUP));
assertEquals(PUT_KEYS * gridCount(), cache.size(ALL)); // Primary + backups.
}
}
else {
keys = nearKeys(cache0, PUT_KEYS, 0);
for (Integer key : keys)
cache0.put(key, String.valueOf(key));
if (hasNearCache()) {
assertEquals(0, cache0.localSize());
assertEquals(PUT_KEYS, cache0.localSize(ALL));
assertEquals(PUT_KEYS, cache0.localSize(NEAR));
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Integer, String> cache = jcache(i);
assertEquals(PUT_KEYS, cache.size(NEAR));
assertEquals(PUT_KEYS, cache.size(BACKUP));
assertEquals(PUT_KEYS * 2, cache.size(PRIMARY, BACKUP));
assertEquals(PUT_KEYS * 2 + PUT_KEYS, cache.size(ALL)); // Primary + backups + near.
}
}
else {
assertEquals(0, cache0.localSize());
assertEquals(0, cache0.localSize(ALL));
assertEquals(0, cache0.localSize(NEAR));
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Integer, String> cache = jcache(i);
assertEquals(0, cache.size(NEAR));
assertEquals(PUT_KEYS, cache.size(BACKUP));
assertEquals(PUT_KEYS * 2, cache.size(PRIMARY, BACKUP));
assertEquals(PUT_KEYS * 2, cache.size(ALL)); // Primary + backups.
}
}
assertEquals(0, cache0.localSize(BACKUP));
assertEquals(0, cache0.localSize(PRIMARY));
}
checkPrimarySize(PUT_KEYS);
Affinity<Integer> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
for (int i = 0; i < gridCount(); i++) {
if (i == nodeIdx)
continue;
ClusterNode node = ignite(i).cluster().localNode();
int primary = 0;
int backups = 0;
for (Integer key : keys) {
if (aff.isPrimary(node, key))
primary++;
else if (aff.isBackup(node, key))
backups++;
}
IgniteCache<Integer, String> cache = jcache(i);
assertEquals(primary, cache.localSize(PRIMARY));
assertEquals(backups, cache.localSize(BACKUP));
assertEquals(primary + backups, cache.localSize(PRIMARY, BACKUP));
assertEquals(primary + backups, cache.localSize(BACKUP, PRIMARY));
assertEquals(primary + backups, cache.localSize(ALL));
}
cache0.remove(keys.get(0));
checkPrimarySize(PUT_KEYS - 1);
if (cacheMode() == REPLICATED) {
assertEquals(PUT_KEYS - 1, cache0.localSize(ALL));
assertEquals(0, cache0.localSize(PRIMARY));
assertEquals(PUT_KEYS - 1, cache0.localSize(BACKUP));
}
else {
if (hasNearCache())
assertEquals(PUT_KEYS - 1, cache0.localSize(ALL));
else
assertEquals(0, cache0.localSize(ALL));
}
}
finally {
if (keys != null)
cache0.removeAll(new HashSet<>(keys));
}
checkEmpty();
}
/**
* @param nodeIdx Node index.
* @throws Exception If failed.
*/
private void checkPartitionSizeAffinityFilter(int nodeIdx) throws Exception {
IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
final int PUT_KEYS = 10;
int part = nodeIdx;
List<Integer> keys = null;
try {
if (cacheMode() == REPLICATED) {
keys = backupKeys(cache0, 10, 0);
for (Integer key : keys)
cache0.put(key, String.valueOf(key));
int partSize = 0;
for (Integer key : keys) {
int keyPart = ignite(nodeIdx).affinity(DEFAULT_CACHE_NAME).partition(key);
if (keyPart == part)
partSize++;
}
assertEquals(PUT_KEYS, cache0.localSize(BACKUP));
assertEquals(PUT_KEYS, cache0.localSize(ALL));
assertEquals(partSize, cache0.localSizeLong(part, BACKUP));
assertEquals(partSize, cache0.localSizeLong(part, ALL));
assertEquals(0, cache0.localSizeLong(part, PRIMARY));
assertEquals(0, cache0.localSizeLong(part, NEAR));
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Integer, String> cache = jcache(i);
assertEquals(0, cache.size(NEAR));
assertEquals(partSize, cache.sizeLong(part, PRIMARY));
assertEquals(partSize * (gridCount() - 1), cache.sizeLong(part, BACKUP));
assertEquals(partSize * gridCount(), cache.sizeLong(part, PRIMARY, BACKUP));
assertEquals(partSize * gridCount(), cache.sizeLong(part, ALL)); // Primary + backups.
}
}
else {
keys = nearKeys(cache0, PUT_KEYS, 0);
for (Integer key : keys)
cache0.put(key, String.valueOf(key));
int partSize = 0;
for (Integer key :keys) {
int keyPart = ignite(nodeIdx).affinity(DEFAULT_CACHE_NAME).partition(key);
if (keyPart == part)
partSize++;
}
if (hasNearCache()) {
assertEquals(0, cache0.localSize());
assertEquals(0, cache0.localSizeLong(part, ALL));
assertEquals(0, cache0.localSizeLong(part, NEAR));
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Integer, String> cache = jcache(i);
assertEquals(0, cache.sizeLong(part, NEAR));
assertEquals(partSize, cache.sizeLong(part, BACKUP));
assertEquals(partSize * 2, cache.sizeLong(part, PRIMARY, BACKUP));
assertEquals(partSize * 2, cache.sizeLong(part, ALL)); // Primary + backups + near.
}
}
else {
assertEquals(0, cache0.localSize());
//assertEquals(partitionSize, cache0.localSizeLong(partition, ALL));
assertEquals(0, cache0.localSizeLong(part, NEAR));
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Integer, String> cache = jcache(i);
assertEquals(0, cache.size(NEAR));
assertEquals(partSize, cache.sizeLong(part, BACKUP));
assertEquals(partSize * 2, cache.sizeLong(part, PRIMARY, BACKUP));
assertEquals(partSize * 2, cache.sizeLong(part, ALL)); // Primary + backups.
}
}
assertEquals(0, cache0.localSize(BACKUP));
assertEquals(0, cache0.localSize(PRIMARY));
}
checkPrimarySize(PUT_KEYS);
Affinity<Integer> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
for (int i = 0; i < gridCount(); i++) {
if (i == nodeIdx)
continue;
ClusterNode node = ignite(i).cluster().localNode();
int primary = 0;
int backups = 0;
for (Integer key : keys) {
if (aff.isPrimary(node, key) && aff.partition(key) == part)
primary++;
else if (aff.isBackup(node, key) && aff.partition(key) == part)
backups++;
}
IgniteCache<Integer, String> cache = jcache(i);
assertEquals(primary, cache.localSizeLong(part, PRIMARY));
assertEquals(backups, cache.localSizeLong(part, BACKUP));
assertEquals(primary + backups, cache.localSizeLong(part, PRIMARY, BACKUP));
assertEquals(primary + backups, cache.localSizeLong(part, BACKUP, PRIMARY));
assertEquals(primary + backups, cache.localSizeLong(part, ALL));
}
cache0.remove(keys.get(0));
keys.remove(0);
checkPrimarySize(PUT_KEYS - 1);
int primary = 0;
int backups = 0;
ClusterNode node = ignite(nodeIdx).cluster().localNode();
for (Integer key : keys) {
if (aff.isPrimary(node, key) && aff.partition(key) == part)
primary++;
else if (aff.isBackup(node, key) && aff.partition(key) == part)
backups++;
}
if (cacheMode() == REPLICATED) {
assertEquals(primary + backups, cache0.localSizeLong(part, ALL));
assertEquals(primary, cache0.localSizeLong(part, PRIMARY));
assertEquals(backups, cache0.localSizeLong(part, BACKUP));
}
else {
if (hasNearCache())
assertEquals(0, cache0.localSizeLong(part, ALL));
else
assertEquals(0, cache0.localSizeLong(part, ALL));
}
}
finally {
if (keys != null)
cache0.removeAll(new HashSet<>(keys));
}
checkEmpty();
}
/**
* Checks size is zero.
*/
private void checkEmpty() {
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Integer, String> cache = jcache(i);
assertEquals(0, cache.localSize());
assertEquals(0, cache.size());
for (CachePeekMode peekMode : CachePeekMode.values()) {
assertEquals(0, cache.localSize(peekMode));
assertEquals(0, cache.size(peekMode));
}
}
checkPrimarySize(0);
}
/**
* @param nodeIdx Node index.
* @return Tuple with primary and backup keys.
*/
private T2<List<Integer>, List<Integer>> swapKeys(int nodeIdx) {
// TODO: GG-11148.
// SwapSpaceSpi swap = ignite(nodeIdx).configuration().getSwapSpaceSpi();
//
// IgniteSpiCloseableIterator<KeyCacheObject> it = swap.keyIterator(SPACE_NAME, null);
IgniteSpiCloseableIterator<KeyCacheObject> it = new GridEmptyCloseableIterator<>();
assertNotNull(it);
Affinity aff = ignite(nodeIdx).affinity(DEFAULT_CACHE_NAME);
ClusterNode node = ignite(nodeIdx).cluster().localNode();
List<Integer> primary = new ArrayList<>();
List<Integer> backups = new ArrayList<>();
CacheObjectContext coctx = ((IgniteEx)ignite(nodeIdx)).context().cache().internalCache(DEFAULT_CACHE_NAME)
.context().cacheObjectContext();
while (it.hasNext()) {
Integer key = it.next().value(coctx, false);
if (aff.isPrimary(node, key))
primary.add(key);
else {
assertTrue(aff.isBackup(node, key));
backups.add(key);
}
}
return new T2<>(primary, backups);
}
/**
* @param nodeIdx Node index.
* @return Tuple with number of primary and backup keys.
*/
private T2<Integer, Integer> swapKeysCount(int nodeIdx) {
T2<List<Integer>, List<Integer>> keys = swapKeys(nodeIdx);
return new T2<>(keys.get1().size(), keys.get2().size());
}
/**
* @param nodeIdx Node index.
* @return Tuple with primary and backup keys.
*/
private T2<List<Integer>, List<Integer>> offheapKeys(int nodeIdx) {
GridCacheAdapter<Integer, String> internalCache =
((IgniteKernal)ignite(nodeIdx)).context().cache().internalCache(DEFAULT_CACHE_NAME);
// TODO GG-11148.
Iterator<Map.Entry<Integer, String>> offheapIt = Collections.EMPTY_MAP.entrySet().iterator();
// if (internalCache.context().isNear())
// offheapIt = internalCache.context().near().dht().context().swap().lazyOffHeapIterator(false);
// else
// offheapIt = internalCache.context().swap().lazyOffHeapIterator(false);
Affinity aff = ignite(nodeIdx).affinity(DEFAULT_CACHE_NAME);
ClusterNode node = ignite(nodeIdx).cluster().localNode();
List<Integer> primary = new ArrayList<>();
List<Integer> backups = new ArrayList<>();
while (offheapIt.hasNext()) {
Map.Entry<Integer, String> e = offheapIt.next();
if (aff.isPrimary(node, e.getKey()))
primary.add(e.getKey());
else {
assertTrue(aff.isBackup(node, e.getKey()));
backups.add(e.getKey());
}
}
return new T2<>(primary, backups);
}
/**
* @param nodeIdx Node index.
* @return Tuple with number of primary and backup keys.
*/
private T2<Integer, Integer> offheapKeysCount(int nodeIdx) {
T2<List<Integer>, List<Integer>> keys = offheapKeys(nodeIdx);
return new T2<>(keys.get1().size(), keys.get2().size());
}
/**
* @param nodeIdx Node index.
* @param part Cache partition.
* @return Tuple with number of primary and backup keys (one or both will be zero).
*/
private T2<Integer, Integer> offheapKeysCount(int nodeIdx, int part) throws IgniteCheckedException {
GridCacheContext ctx = ((IgniteEx)ignite(nodeIdx)).context().cache().internalCache(DEFAULT_CACHE_NAME).context();
// Swap and offheap are disabled for near cache.
IgniteCacheOffheapManager offheapManager = ctx.isNear() ? ctx.near().dht().context().offheap() : ctx.offheap();
//First count entries...
int cnt = (int)offheapManager.cacheEntriesCount(ctx.cacheId(), part);
GridCacheAffinityManager affinity = ctx.affinity();
AffinityTopologyVersion topVer = affinity.affinityTopologyVersion();
//And then find out whether they are primary or backup ones.
int primaryCnt = 0;
int backupCnt = 0;
if (affinity.primaryByPartition(ctx.localNode(), part, topVer))
primaryCnt = cnt;
else if (affinity.backupByPartition(ctx.localNode(), part, topVer))
backupCnt = cnt;
return new T2<>(primaryCnt, backupCnt);
}
/**
* @param nodeIdx Node index.
* @throws Exception If failed.
*/
private void checkSizeStorageFilter(int nodeIdx) throws Exception {
if (true) // TODO GG-11148.
return;
IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
List<Integer> primaryKeys = primaryKeys(cache0, 100, 10_000);
List<Integer> backupKeys = backupKeys(cache0, 100, 10_000);
try {
final String val = "test_value";
for (int i = 0; i < 100; i++) {
cache0.put(primaryKeys.get(i), val);
cache0.put(backupKeys.get(i), val);
}
int totalKeys = 200;
T2<Integer, Integer> swapKeys = swapKeysCount(nodeIdx);
assertTrue(swapKeys.get1() > 0);
assertTrue(swapKeys.get2() > 0);
T2<Integer, Integer> offheapKeys = offheapKeysCount(nodeIdx);
assertTrue(offheapKeys.get1() > 0);
assertTrue(offheapKeys.get2() > 0);
int totalSwap = swapKeys.get1() + swapKeys.get2();
int totalOffheap = offheapKeys.get1() + offheapKeys.get2();
log.info("Local keys [total=" + totalKeys + ", offheap=" + offheapKeys + ", swap=" + swapKeys + ']');
assertTrue(totalSwap + totalOffheap < totalKeys);
assertEquals(primaryKeys.size(), cache0.localSize());
assertEquals(totalKeys, cache0.localSize(ALL));
assertEquals(totalOffheap, cache0.localSize(PRIMARY, BACKUP, NEAR, OFFHEAP));
assertEquals(totalSwap, cache0.localSize(PRIMARY, BACKUP, NEAR));
assertEquals(totalKeys - (totalOffheap + totalSwap), cache0.localSize(PRIMARY, BACKUP, NEAR, ONHEAP));
assertEquals(totalKeys, cache0.localSize(PRIMARY, BACKUP, NEAR, OFFHEAP, ONHEAP));
assertEquals(swapKeys.get1(), (Integer)cache0.localSize(PRIMARY));
assertEquals(swapKeys.get2(), (Integer)cache0.localSize(BACKUP));
assertEquals(offheapKeys.get1(), (Integer)cache0.localSize(OFFHEAP, PRIMARY));
assertEquals(offheapKeys.get2(), (Integer)cache0.localSize(OFFHEAP, BACKUP));
assertEquals(swapKeys.get1() + offheapKeys.get1(), cache0.localSize(OFFHEAP, PRIMARY));
assertEquals(swapKeys.get2() + offheapKeys.get2(), cache0.localSize(OFFHEAP, BACKUP));
assertEquals(totalSwap + totalOffheap, cache0.localSize(PRIMARY, BACKUP, NEAR, OFFHEAP));
int globalSwapPrimary = 0;
int globalSwapBackup = 0;
int globalOffheapPrimary = 0;
int globalOffheapBackup = 0;
for (int i = 0; i < gridCount(); i++) {
T2<Integer, Integer> swap = swapKeysCount(i);
globalSwapPrimary += swap.get1();
globalSwapBackup += swap.get2();
T2<Integer, Integer> offheap = offheapKeysCount(i);
globalOffheapPrimary += offheap.get1();
globalOffheapBackup += offheap.get2();
}
int backups;
if (cacheMode() == PARTITIONED)
backups = 1;
else // REPLICATED.
backups = gridCount() - 1;
int globalTotal = totalKeys + totalKeys * backups;
int globalTotalSwap = globalSwapPrimary + globalSwapBackup;
int globalTotalOffheap = globalOffheapPrimary + globalOffheapBackup;
log.info("Global keys [total=" + globalTotal +
", offheap=" + globalTotalOffheap +
", swap=" + globalTotalSwap + ']');
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Integer, String> cache = jcache(i);
assertEquals(totalKeys, cache.size(PRIMARY));
assertEquals(globalTotal, cache.size(ALL));
assertEquals(globalTotal, cache.size(PRIMARY, BACKUP, NEAR, ONHEAP, OFFHEAP));
assertEquals(globalTotal, cache.size(ONHEAP, OFFHEAP, PRIMARY, BACKUP));
assertEquals(globalTotalSwap, cache.size(PRIMARY, BACKUP, NEAR));
assertEquals(globalSwapPrimary, cache.size(PRIMARY));
assertEquals(globalSwapBackup, cache.size(BACKUP));
assertEquals(globalTotalOffheap, cache.size(PRIMARY, BACKUP, NEAR, OFFHEAP));
assertEquals(globalOffheapPrimary, cache.size(OFFHEAP, PRIMARY));
assertEquals(globalOffheapBackup, cache.size(OFFHEAP, BACKUP));
assertEquals(globalTotalSwap + globalTotalOffheap, cache.size(PRIMARY, BACKUP, NEAR, OFFHEAP));
assertEquals(globalSwapPrimary + globalOffheapPrimary, cache.size(OFFHEAP, PRIMARY));
assertEquals(globalSwapBackup + globalOffheapBackup, cache.size(OFFHEAP, BACKUP));
assertEquals(globalTotal - (globalTotalOffheap + globalTotalSwap), cache.size(PRIMARY, BACKUP, NEAR, ONHEAP));
}
}
finally {
cache0.removeAll(new HashSet<>(primaryKeys));
cache0.removeAll(new HashSet<>(backupKeys));
}
checkEmpty();
}
/**
* @param nodeIdx Node index.
* @throws Exception If failed.
*/
private void checkPartitionSizeStorageFilter(int nodeIdx) throws Exception {
IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
int part = nodeIdx;
List<Integer> primaryKeys = primaryKeys(cache0, 100, 10_000);
List<Integer> backupKeys = backupKeys(cache0, 100, 10_000);
try {
final String val = "test_value";
for (int i = 0; i < 100; i++) {
cache0.put(primaryKeys.get(i), val);
cache0.put(backupKeys.get(i), val);
}
int totalKeys = 200;
T2<Integer, Integer> offheapKeys = offheapKeysCount(nodeIdx, part);
int totalOffheap = offheapKeys.get1() + offheapKeys.get2();
log.info("Local keys [total=" + totalKeys + ", offheap=" + offheapKeys + ']');
assertTrue(totalOffheap < totalKeys);
assertEquals(primaryKeys.size(), cache0.localSize());
assertEquals(totalKeys, cache0.localSize(ALL));
assertEquals(totalOffheap, cache0.localSizeLong(part, PRIMARY, BACKUP, NEAR, OFFHEAP));
assertEquals((long)offheapKeys.get1(), cache0.localSizeLong(part, OFFHEAP, PRIMARY));
assertEquals((long)offheapKeys.get2(), cache0.localSizeLong(part, OFFHEAP, BACKUP));
int globalParitionSwapPrimary = 0;
int globalPartSwapBackup = 0;
int globalPartOffheapPrimary = 0;
int globalPartOffheapBackup = 0;
for (int i = 0; i < gridCount(); i++) {
T2<Integer, Integer> offheap = offheapKeysCount(i, part);
globalPartOffheapPrimary += offheap.get1();
globalPartOffheapBackup += offheap.get2();
}
int backups;
if (cacheMode() == PARTITIONED)
backups = 1;
else // REPLICATED.
backups = gridCount() - 1;
int globalTotal = totalKeys + totalKeys * backups;
int globalPartTotalSwap = globalParitionSwapPrimary + globalPartSwapBackup;
int globalPartTotalOffheap = globalPartOffheapPrimary + globalPartOffheapBackup;
log.info("Global keys [total=" + globalTotal +
", offheap=" + globalPartTotalOffheap +
", swap=" + globalPartTotalSwap + ']');
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Integer, String> cache = jcache(i);
assertEquals(totalKeys, cache.size(PRIMARY));
assertEquals(globalTotal, cache.size(ALL));
assertEquals(globalTotal, cache.size(PRIMARY, BACKUP, NEAR, ONHEAP, OFFHEAP));
assertEquals(globalTotal, cache.size(ONHEAP, OFFHEAP, PRIMARY, BACKUP));
assertEquals(globalPartTotalSwap, cache.sizeLong(part, PRIMARY, BACKUP, NEAR));
assertEquals(globalParitionSwapPrimary, cache.sizeLong(part, PRIMARY));
assertEquals(globalPartSwapBackup, cache.sizeLong(part, BACKUP));
assertEquals(globalPartTotalOffheap, cache.sizeLong(part, PRIMARY, BACKUP, NEAR, OFFHEAP));
assertEquals(globalPartOffheapPrimary, cache.sizeLong(part, OFFHEAP, PRIMARY));
assertEquals(globalPartOffheapBackup, cache.sizeLong(part, OFFHEAP, BACKUP));
assertEquals(globalPartTotalSwap + globalPartTotalOffheap, cache.sizeLong(part, PRIMARY, BACKUP, NEAR, OFFHEAP));
assertEquals(globalParitionSwapPrimary + globalPartOffheapPrimary, cache.sizeLong(part, OFFHEAP, PRIMARY));
assertEquals(globalPartSwapBackup + globalPartOffheapBackup, cache.sizeLong(part, OFFHEAP, BACKUP));
}
}
finally {
cache0.removeAll(new HashSet<>(primaryKeys));
cache0.removeAll(new HashSet<>(backupKeys));
}
checkEmpty();
}
/**
* @param exp Expected size.
*/
private void checkPrimarySize(int exp) {
int size = 0;
for (int i = 0; i < gridCount(); i++) {
IgniteCache<Integer, String> cache = jcache(i);
assertEquals(exp, cache.size(PRIMARY));
size += cache.localSize(PRIMARY);
assertEquals(exp, (int)cache.sizeAsync(PRIMARY).get());
}
assertEquals(exp, size);
}
/**
* @param nodeIdx Node index.
* @throws Exception If failed.
*/
private void checkLocalEntriesStorageFilter(int nodeIdx) throws Exception {
if (true) // TODO GG-11148.
return;
IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
List<Integer> primaryKeys = primaryKeys(cache0, 100, 10_000);
List<Integer> backupKeys = backupKeys(cache0, 100, 10_000);
try {
final String val = "test_value-";
for (int i = 0; i < 100; i++) {
cache0.put(primaryKeys.get(i), val + primaryKeys.get(i));
cache0.put(backupKeys.get(i), val + backupKeys.get(i));
}
int totalKeys = 200;
T2<List<Integer>, List<Integer>> swapKeys = swapKeys(nodeIdx);
assertTrue(!swapKeys.get1().isEmpty());
assertTrue(!swapKeys.get2().isEmpty());
T2<List<Integer>, List<Integer>> offheapKeys = offheapKeys(nodeIdx);
assertTrue(!offheapKeys.get1().isEmpty());
assertTrue(!offheapKeys.get2().isEmpty());
List<Integer> swap = new ArrayList<>();
swap.addAll(swapKeys.get1());
swap.addAll(swapKeys.get2());
assertFalse(swap.isEmpty());
List<Integer> offheap = new ArrayList<>();
offheap.addAll(offheapKeys.get1());
offheap.addAll(offheapKeys.get2());
assertFalse(offheap.isEmpty());
List<Integer> heap = new ArrayList<>();
heap.addAll(primaryKeys);
heap.addAll(backupKeys);
heap.removeAll(swap);
heap.removeAll(offheap);
log.info("Keys [total=" + totalKeys +
", offheap=" + offheap.size() +
", swap=" + swap.size() + ']');
assertFalse(heap.isEmpty());
checkLocalEntries(cache0.localEntries(), val, primaryKeys, backupKeys);
checkLocalEntries(cache0.localEntries(ALL), val, primaryKeys, backupKeys);
checkLocalEntries(cache0.localEntries(ONHEAP, OFFHEAP), val, primaryKeys, backupKeys);
checkLocalEntries(cache0.localEntries(), val, swap);
checkLocalEntries(cache0.localEntries(OFFHEAP), val, offheap);
checkLocalEntries(cache0.localEntries(ONHEAP), val, heap);
checkLocalEntries(cache0.localEntries(OFFHEAP), val, swap, offheap);
checkLocalEntries(cache0.localEntries(ONHEAP), val, swap, heap);
checkLocalEntries(cache0.localEntries(PRIMARY), val, swapKeys.get1());
checkLocalEntries(cache0.localEntries(BACKUP), val, swapKeys.get2());
checkLocalEntries(cache0.localEntries(OFFHEAP, PRIMARY), val, offheapKeys.get1());
checkLocalEntries(cache0.localEntries(OFFHEAP, BACKUP), val, offheapKeys.get2());
checkLocalEntries(cache0.localEntries(OFFHEAP, PRIMARY), val, swapKeys.get1(), offheapKeys.get1());
checkLocalEntries(cache0.localEntries(OFFHEAP, BACKUP), val, swapKeys.get2(), offheapKeys.get2());
checkLocalEntries(cache0.localEntries(OFFHEAP, PRIMARY, BACKUP), val, swap, offheap);
}
finally {
cache0.removeAll(new HashSet<>(primaryKeys));
cache0.removeAll(new HashSet<>(backupKeys));
}
}
/**
* @param nodeIdx Node index.
* @throws Exception If failed.
*/
private void checkLocalEntriesAffinityFilter(int nodeIdx) throws Exception {
IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
final int PUT_KEYS = 10;
List<Integer> primaryKeys = null;
List<Integer> backupKeys = null;
List<Integer> nearKeys = null;
try {
primaryKeys = primaryKeys(cache0, PUT_KEYS, 0);
backupKeys = backupKeys(cache0, PUT_KEYS, 0);
for (Integer key : primaryKeys)
cache0.put(key, String.valueOf(key));
for (Integer key : backupKeys)
cache0.put(key, String.valueOf(key));
nearKeys = cacheMode() == PARTITIONED ? nearKeys(cache0, PUT_KEYS, 0) : Collections.<Integer>emptyList();
for (Integer key : nearKeys)
cache0.put(key, String.valueOf(key));
log.info("Keys [near=" + nearKeys + ", primary=" + primaryKeys + ", backup=" + backupKeys + ']');
if (hasNearCache()) {
checkLocalEntries(cache0.localEntries(), nearKeys, primaryKeys, backupKeys);
checkLocalEntries(cache0.localEntries(ALL), nearKeys, primaryKeys, backupKeys);
checkLocalEntries(cache0.localEntries(NEAR), nearKeys);
checkLocalEntries(cache0.localEntries(PRIMARY, BACKUP, NEAR), nearKeys, primaryKeys, backupKeys);
checkLocalEntries(cache0.localEntries(NEAR, PRIMARY), nearKeys, primaryKeys);
checkLocalEntries(cache0.localEntries(NEAR, BACKUP), nearKeys, backupKeys);
}
else {
checkLocalEntries(cache0.localEntries(), primaryKeys, backupKeys);
checkLocalEntries(cache0.localEntries(ALL), primaryKeys, backupKeys);
checkLocalEntries(cache0.localEntries(NEAR));
checkLocalEntries(cache0.localEntries(NEAR, PRIMARY), primaryKeys);
checkLocalEntries(cache0.localEntries(NEAR, BACKUP), backupKeys);
checkLocalEntries(cache0.localEntries(PRIMARY, BACKUP, NEAR), primaryKeys, backupKeys);
}
checkLocalEntries(cache0.localEntries(PRIMARY), primaryKeys);
checkLocalEntries(cache0.localEntries(BACKUP), backupKeys);
checkLocalEntries(cache0.localEntries(PRIMARY, BACKUP), primaryKeys, backupKeys);
}
finally {
if (primaryKeys != null)
cache0.removeAll(new HashSet<>(primaryKeys));
if (backupKeys != null)
cache0.removeAll(new HashSet<>(backupKeys));
if (nearKeys != null)
cache0.removeAll(new HashSet<>(nearKeys));
}
}
/**
* @param entries Entries.
* @param exp Expected entries.
*/
private void checkLocalEntries(Iterable<Cache.Entry<Integer, String>> entries, Collection<Integer>... exp) {
checkLocalEntries(entries, "", exp);
}
/**
* @param entries Entries.
* @param expVal Expected value.
* @param exp Expected keys.
*/
private void checkLocalEntries(Iterable<Cache.Entry<Integer, String>> entries,
String expVal,
Collection<Integer>... exp) {
Set<Integer> allExp = new HashSet<>();
for (Collection<Integer> col : exp)
assertTrue(allExp.addAll(col));
for (Cache.Entry<Integer, String> e : entries) {
assertNotNull(e.getKey());
assertNotNull(e.getValue());
assertEquals(expVal + e.getKey(), e.getValue());
assertTrue("Unexpected entry: " + e, allExp.remove(e.getKey()));
}
assertTrue("Expected entries not found: " + allExp, allExp.isEmpty());
}
}