blob: 78186590bd3c01cce182118d4a7f64fdccf43659 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.util;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
import org.junit.Test;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.SINGLE_Q;
/**
*
*/
public class GridConcurrentLinkedHashMapMultiThreadedSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
@Test
public void testPut() throws Exception {
info(">>> Test grid concurrent linked hash map...");
int keyCnt = 1000000;
ConcurrentLinkedHashMap<Integer, String> linkedMap =
new ConcurrentLinkedHashMap<>(1000, 0.75f, 64);
putMultiThreaded(linkedMap, 10, keyCnt, 0);
assert linkedMap.size() == keyCnt;
assert linkedMap.sizex() == keyCnt;
assert linkedMap.queue().sizex() == keyCnt;
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutPerSegment() throws Exception {
info(">>> Test grid concurrent linked hash map...");
int keyCnt = 1000000;
ConcurrentLinkedHashMap<Integer, String> linkedMap =
new ConcurrentLinkedHashMap<>(1000, 0.75f, 64, 0, SINGLE_Q);
putMultiThreaded(linkedMap, 10, keyCnt, 0);
assert linkedMap.size() == keyCnt;
assert linkedMap.sizex() == keyCnt;
}
/**
* @throws Exception If failed.
*/
@Test
public void testEvict() throws Exception {
info(">>> Test grid concurrent linked hash map...");
final int maxSize = 1000;
ConcurrentLinkedHashMap<Integer, String> linkedMap = new ConcurrentLinkedHashMap<>(
32, 0.75f, 64, maxSize);
int keyCnt = 1000000;
int diff = 10; // 1% of 1000.
Map<String, LinkedList<Integer>> map = putMultiThreaded(linkedMap, 10, keyCnt, maxSize);
LinkedList<Integer> keys = new LinkedList<>(linkedMap.keySet());
assertTrue("Invalid key set size: " + keys.size(), U.safeAbs(maxSize - keys.size()) <= diff);
assertTrue("Invalid map size: " + linkedMap.size(), U.safeAbs(maxSize - linkedMap.size()) <= diff);
assertTrue("Invalid map sizex: " + linkedMap.sizex(), U.safeAbs(maxSize - linkedMap.sizex()) <= diff);
assertTrue("Invalid map queue size: " + linkedMap.queue().sizex(),
U.safeAbs(maxSize - linkedMap.queue().sizex()) <= diff);
while (!keys.isEmpty()) {
boolean found = false;
int key = keys.removeLast();
for (LinkedList<Integer> threadKeys : map.values()) {
if (threadKeys.getLast() == key) {
threadKeys.removeLast();
found = true;
break;
}
}
assertTrue("Key was not found on the top of any thread: " + key, found);
}
int min = Integer.MAX_VALUE;
int max = 0;
int actKeyCnt = 0;
for (int key : linkedMap.keySet()) {
min = Math.min(min, key);
max = Math.max(max, key);
actKeyCnt++;
}
info("Max: " + max);
info("Min: " + min);
assertTrue("Invalid keys count: " + actKeyCnt, U.safeAbs(maxSize - actKeyCnt) <= diff);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEvictPerSegment() throws Exception {
info(">>> Test grid concurrent linked hash map...");
int concurLvl = 64;
final int maxSize = concurLvl * 30;
int diff = (int)(maxSize * 0.1);
ConcurrentLinkedHashMap<Integer, String> linkedMap = new ConcurrentLinkedHashMap<>(
32, 0.75f, concurLvl, maxSize, PER_SEGMENT_Q);
int keyCnt = 1000000;
Map<String, LinkedList<Integer>> map = putMultiThreaded(
linkedMap,
10,
keyCnt,
maxSize * 10); // Intentionally memorize more than maxSize since in this mode LRU is not fair.
assertTrue("Invalid map size: " + linkedMap.size(), U.safeAbs(maxSize - linkedMap.size()) <= diff);
assertTrue("Invalid map sizex: " + linkedMap.sizex(), U.safeAbs(maxSize - linkedMap.sizex()) <= diff);
LinkedList<Integer> keys = new LinkedList<>(linkedMap.keySet());
while (!keys.isEmpty()) {
boolean found = false;
int key = keys.removeLast();
for (LinkedList<Integer> threadKeys : map.values()) {
if (threadKeys.contains(key)) {
found = true;
break;
}
}
assertTrue("Key was not found in any thread: " + key, found);
}
int min = Integer.MAX_VALUE;
int max = 0;
int actKeyCnt = 0;
for (int key = 0; key < keyCnt; key++) {
if (linkedMap.get(key) != null) {
min = Math.min(min, key);
max = Math.max(max, key);
actKeyCnt++;
}
}
info("Max: " + max);
info("Min: " + min);
assertTrue("Invalid keys count: " + actKeyCnt, U.safeAbs(maxSize - actKeyCnt) <= diff);
}
/**
* @throws Exception If failed.
*/
@Test
public void testEvictPerSegmentOptimizedRemoves() throws Exception {
info(">>> Test grid concurrent linked hash map...");
int concurLvl = 64;
final int maxSize = concurLvl * 30;
int diff = (int)(maxSize * 0.1);
ConcurrentLinkedHashMap<Integer, String> linkedMap = new ConcurrentLinkedHashMap<>(
32, 0.75f, concurLvl, maxSize, PER_SEGMENT_Q_OPTIMIZED_RMV);
int keyCnt = 1000000;
Map<String, LinkedList<Integer>> map = putMultiThreaded(
linkedMap,
10,
keyCnt,
maxSize * 10); // Intentionally memorize more than maxSize since in this mode LRU is not fair.
assertTrue("Invalid map size: " + linkedMap.size(), U.safeAbs(maxSize - linkedMap.size()) <= diff);
assertTrue("Invalid map sizex: " + linkedMap.sizex(), U.safeAbs(maxSize - linkedMap.sizex()) <= diff);
LinkedList<Integer> keys = new LinkedList<>(linkedMap.keySet());
while (!keys.isEmpty()) {
boolean found = false;
int key = keys.removeLast();
for (LinkedList<Integer> threadKeys : map.values()) {
if (threadKeys.contains(key)) {
found = true;
break;
}
}
assertTrue("Key was not found in any thread: " + key, found);
}
int min = Integer.MAX_VALUE;
int max = 0;
int actKeyCnt = 0;
for (int key = 0; key < keyCnt; key++) {
if (linkedMap.get(key) != null) {
min = Math.min(min, key);
max = Math.max(max, key);
actKeyCnt++;
}
}
info("Max: " + max);
info("Min: " + min);
assertTrue("Invalid keys count: " + actKeyCnt, U.safeAbs(maxSize - actKeyCnt) <= diff);
}
/**
* @param map Map.
* @param threadCnt Thread count.
* @param keyCnt Key count.
* @param rememberCnt Maximum count of added keys per thread to remember
* @return Keys distribution across threads.
* @throws Exception If failed.
*/
private Map<String, LinkedList<Integer>> putMultiThreaded(final ConcurrentMap<Integer, String> map, int threadCnt,
final int keyCnt, final int rememberCnt)
throws Exception {
final AtomicInteger keyGen = new AtomicInteger();
long start = U.currentTimeMillis();
final ConcurrentHashMap<String, LinkedList<Integer>> res = new ConcurrentHashMap<>();
// Producer thread.
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
String thNm = Thread.currentThread().getName();
LinkedList<Integer> keys = new LinkedList<>();
LinkedList<Integer> old = res.put(thNm, keys);
assert old == null;
while (true) {
Integer key = keyGen.getAndIncrement();
if (key >= keyCnt)
break;
map.put(key, "value");
keys.add(key);
if (keys.size() > rememberCnt)
keys.removeFirst();
}
return null;
}
},
threadCnt,
"producer"
);
fut.get();
info("Put finished [keyCnt=" + keyCnt + ", threadCnt=" + threadCnt +
", duration=" + (U.currentTimeMillis() - start) + ']');
return res;
}
/**
* @throws Exception If failed.
*/
@Test
public void testInsertOrderIterator() throws Exception {
final AtomicBoolean run = new AtomicBoolean(true);
info(">>> Test grid concurrent linked hash map iterator...");
final Map<Integer, String> linkedMap =
new ConcurrentLinkedHashMap<>();
Set<Integer> original = new HashSet<>();
final int keyCnt = 10000;
for (int i = 0; i < keyCnt; i++) {
linkedMap.put(i, "value" + i);
original.add(i);
}
long start = System.currentTimeMillis();
// Updater threads.
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
Random rnd = new Random();
while (run.get()) {
int key = rnd.nextInt(keyCnt);
linkedMap.put(key, "value" + key);
}
return null;
}
},
10,
"updater"
);
try {
// Check that iterator always contains all the values.
int iterCnt = 10000;
for (int i = 0; i < iterCnt; i++) {
Collection<Integer> cp = new HashSet<>(original);
cp.removeAll(linkedMap.keySet());
assertTrue("Keys disappeared from map: " + cp, cp.isEmpty());
}
info(">>> Iterator test complete [duration = " + (System.currentTimeMillis() - start) + ']');
}
finally {
run.set(false);
fut.get();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testIteratorModificationInsertOrder() throws Exception {
testGetRemovePutIterator();
}
/**
* @throws Exception If failed.
*/
@Test
public void testInsertOrderGetRemovePut() throws Exception {
testPutGetRemove(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testInsertOrderPutGetRemove() throws Exception {
testPutGetRemove(true);
}
/**
* Test multithreaded put, get, remove operations in concurrent linked hash map.
*
*
* @param clear {@code true} if test should expect clear map at the end.
* @throws Exception If failed.
*/
private void testPutGetRemove(final boolean clear) throws Exception {
info(">>> Test grid concurrent linked hash map iterator...");
final ConcurrentLinkedHashMap<Integer, String> linkedMap =
new ConcurrentLinkedHashMap<>();
Collection<Integer> original = new HashSet<>();
final int keyCnt = 10000;
if (!clear)
for (int i = 0; i < keyCnt; i++) {
linkedMap.put(i, "value" + i);
original.add(i);
}
long start = System.currentTimeMillis();
// Updater threads.
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
Random rnd = new Random();
int iterCnt = 100000;
for (int i = 0; i < iterCnt; i++) {
int key = rnd.nextInt(keyCnt);
if (clear) {
linkedMap.put(key, "value" + key);
linkedMap.get(key);
linkedMap.remove(key);
}
else {
linkedMap.get(key);
linkedMap.remove(key);
linkedMap.put(key, "value" + key);
}
}
return null;
}
},
10,
"updater"
);
fut.get();
Set<Integer> keys = linkedMap.keySet();
if (clear)
assertTrue("Keys must not be in map " + keys, keys.isEmpty());
else {
original.removeAll(keys);
assertTrue("Keys must be in map: " + original, original.isEmpty());
}
info(">>> put get remove test complete [duration = " + (System.currentTimeMillis() - start) + ']');
}
/**
* Test multithreaded operations in concurrent linked hash map and iterator consistency.
*
* @throws Exception If failed.
*/
@Test
public void testGetRemovePutIterator() throws Exception {
info(">>> Test grid concurrent linked hash map iterator...");
final ConcurrentLinkedHashMap<Integer, String> linkedMap =
new ConcurrentLinkedHashMap<>();
Collection<Integer> original = new HashSet<>();
final int keyCnt = 10000;
for (int i = 0; i < keyCnt; i++) {
linkedMap.put(i, "value" + i);
original.add(i);
}
final AtomicBoolean run = new AtomicBoolean(true);
long start = System.currentTimeMillis();
// Updater threads.
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
Random rnd = new Random();
while (run.get()) {
int key = rnd.nextInt(keyCnt);
linkedMap.get(key);
linkedMap.remove(key);
linkedMap.put(key, "value" + key);
}
info(">>> Exiting updater thread");
return null;
}
},
10,
"updater"
);
int iterCnt = 10000;
for (int i = 0; i < iterCnt; i++) {
Iterator<Integer> it = linkedMap.keySet().iterator();
Collection<Integer> keys = new HashSet<>();
// Since we have 10 running threads, iterator should show not less then keyCnt - 10 elements.
while (it.hasNext()) {
int key = it.next();
assertFalse("Duplicate key: " + key, keys.contains(key));
keys.add(key);
}
if (i % 500 == 0)
info(">>> Run " + i + " iterations in " + (System.currentTimeMillis() - start) + "ms");
}
info(">>> Stopping updater threads");
run.set(false);
fut.get();
info(">>> Updater threads stopped, will verify integrity of result map");
Set<Integer> keys = linkedMap.keySet();
original.removeAll(keys);
assertTrue("Keys must be in map: " + original, original.isEmpty());
info(">>> put get remove test complete [duration = " + (System.currentTimeMillis() - start) + ']');
}
}