blob: 0a4502cb386cf302e3d2ddf3403137526642d9ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.jcas.impl;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
import org.apache.uima.internal.util.Misc;
import org.apache.uima.internal.util.MultiThreadUtils;
import org.apache.uima.jcas.cas.TOP;
import junit.framework.TestCase;
public class JCasHashMapTest extends TestCase {
static final int SIZE = 20000; // set > 2 million for cache avoidance timing tests
static final Random r = new Random();
static final long SEED = r.nextLong();
// -6419339010654937562L; // causes skew
// 12345;
static {
System.out.println("JCasHashMapTest load: set random seed to " + SEED);
r.setSeed(SEED);
}
static private int[] addrs = new int[SIZE];
static { createAddrs(); }
private static void createAddrs() {
int prev = 0;
// unique numbers
for (int i = 0; i < SIZE; i++) {
addrs[i] = prev = prev + r.nextInt(14) + 1;
}
// shuffled
for (int i = SIZE - 1; i >= 1; i--) {
int ir = r.nextInt(i+1);
int temp = addrs[i];
addrs[i] = addrs[ir];
addrs[ir] = temp;
}
}
private final AtomicBoolean okToProceed = new AtomicBoolean();
public void testBasic() {
JCasHashMap m;
for (int i = 1; i <= 128; i *= 2) {
JCasHashMap.setDEFAULT_CONCURRENCY_LEVEL(i);
// test default concurrency level adjusted down
m = new JCasHashMap(32 * i);
assertEquals( i, m.getConcurrencyLevel());
m = new JCasHashMap(16 * i);
assertEquals(Math.max(1, i / 2), m.getConcurrencyLevel());
//test capacity adjusted up
m = new JCasHashMap(32 * i, i);
assertEquals( 32 * i, m.getCapacity());
m = new JCasHashMap(31 * i, i);
assertEquals( 32 * i, m.getCapacity());
m = new JCasHashMap(16 * i, i);
assertEquals( 32 * i, m.getCapacity());
}
}
public void testWithPerf() {
for (int i = 0; i < 5; i++ ) {
arun(SIZE);
}
arunCk(SIZE);
// for (int i = 0; i < 50; i++ ) {
// arun2(2000000);
// }
}
public void testMultiThread() throws Exception {
final Random random = new Random();
int numberOfThreads = Misc.numberOfCores;
System.out.format("test JCasHashMap with up to %d threads%n", numberOfThreads);
for (int th = 2; th <= numberOfThreads; th *=2) {
JCasHashMap.setDEFAULT_CONCURRENCY_LEVEL(th);
final JCasHashMap m = new JCasHashMap(200);
MultiThreadUtils.Run2isb run2isb = new MultiThreadUtils.Run2isb() {
public void call(int threadNumber, int repeatNumber, StringBuilder sb) {
for (int k = 0; k < 4; k++) {
for (int i = 0; i < SIZE / 4; i++) {
final int key = addrs[random.nextInt(SIZE / 16)];
m.putIfAbsent(key, x -> TOP._createSearchKey(x));
// TOP fs = m.getReserve(key);
// if (null == fs) {
//
// m.put(TOP._createSearchKey(key));
// }
}
try {
Thread.sleep(0, random.nextInt(1000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// System.out.println(sb.toString());
}
};
MultiThreadUtils.tstMultiThread("JCasHashMapTest", numberOfThreads, 10, run2isb,
new Runnable() {
public void run() {
m.clear();
}});
}
}
public void testMultiThreadCompare() throws Exception {
final Random random = new Random();
// random.setSeed(1234L); // debug
int numberOfThreads = Misc.numberOfCores;
System.out.format("test JCasHashMap with compare with up to %d threads%n", numberOfThreads);
final ConcurrentMap<Integer, TOP> check = // one check map, run on multiple threads
new ConcurrentHashMap<Integer, TOP>(SIZE, .5F, numberOfThreads * 2);
for (int th = 2; th <= numberOfThreads; th *= 2) {
JCasHashMap.setDEFAULT_CONCURRENCY_LEVEL(th);
final JCasHashMap m = new JCasHashMap(200); // one JCasHashMap run on multiple threads
MultiThreadUtils.Run2isb run2isb = new MultiThreadUtils.Run2isb() {
public void call(int threadNumber, int repeatNumber, StringBuilder sb) {
for (int k = 0; k < 4; k++) {
for (int i = 0; i < SIZE / 4; i++) {
final int key = addrs[random.nextInt(SIZE / 16)];
final TOP[] createdFS = new TOP[1];
TOP fs = m.putIfAbsent(key, x -> {
TOP tmp = createdFS[0] = TOP._createSearchKey(x);
check.put(key, tmp);
return tmp;
});
if (createdFS[0] != null) {
// check.put(key, createdFS[0]);
} else {
TOP fscheck = check.get(key);
if (fscheck == null || fscheck != fs) {
String msg = String.format("JCasHashMapTest miscompare, repeat=%,d, count=%,d key=%,d"
+ ", checkKey=%s JCasHashMapKey=%,d",
k, i, key, (null == fscheck) ? "null" : Integer.toString(fscheck._id()), fs._id());
System.err.println(msg);
throw new RuntimeException(msg);
}
}
}
try {
Thread.sleep(0, random.nextInt(1000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// System.out.println(sb.toString());
}
};
MultiThreadUtils.tstMultiThread("JCasHashMapTest", numberOfThreads, 10, run2isb,
new Runnable() {
public void run() {
check.clear();
m.clear();
}
});
}
}
/**
* Create situation
// * make a set of indexed fs instances, no JCas
// * on multiple threads, simultaneously, attempt to get the jcas cover object for this
// * one getReserve should succeed, but reserve, and the others should "wait".
// * then put
// * then the others should "wakeup" and return the same instance
* on multiple threads, attempt to putIfAbsent a special search key instance, simultaneously
* one thread should succeed, the others should block while the succeeding one is
* awaiting an external "go" signal.
* Once that go signal happens, the other threads should succeed, and return the
* == fs to the first one.
* @throws Exception
*/
public void testMultiThreadCollide() throws Exception {
int numberOfThreads = Misc.numberOfCores;
if (numberOfThreads < 2) {
return;
}
System.out.format("test JCasHashMap collide with up to %d threads%n", numberOfThreads);
Thread thisThread = Thread.currentThread();
final int subThreadPriority = thisThread.getPriority();
thisThread.setPriority(subThreadPriority - 1);
final MultiThreadUtils.ThreadM[] threads = new MultiThreadUtils.ThreadM[numberOfThreads];
final JCasHashMap m = new JCasHashMap(200);
final Random r = new Random(); // used to sleep from 0 to 4 milliseconds
final int hashKey = 15;
final TOP fs = TOP._createSearchKey(hashKey);
final TOP[] found = new TOP[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++) {
final int finalI = i;
threads[i] = new MultiThreadUtils.ThreadM() {
/**
* for thread 0 -> nbr of threads -1:
* wait,
* sleep,
* do a putIfAbsent of "fs" - first one will succeed, others should wait till success
* set found[thread#] to m.putIfAbsent(hashkey);
*
* loop above until terminate thread
*/
public void run() {
while (true) {
// System.err.println("in loop about to wait4go " + this.getName() );
if (!MultiThreadUtils.wait4go(this)) {
break;
}
MultiThreadUtils.sleep(r.nextInt(500000)); // 0-500 microseconds
found[finalI] = m.putIfAbsent(hashKey, k -> {
// k is ignored, hashKey is final, the fs returned is constant
threads[finalI].utilBoolean.set(true);
try {
while (true) {
try {
if (okToProceed.get() == true) {
break;
}
Thread.sleep(5); // 5 milli
} catch (InterruptedException e) {
}
}
return fs;
} finally {
threads[finalI].utilBoolean.set(false);
}
});
}
}
};
threads[i].setPriority(subThreadPriority);
threads[i].start();
}
for (int loopCount = 0; loopCount < 10; loopCount ++) {
System.out.println(" JCasHashMap collide loop count is " + loopCount);
// release the threads
for (int th = 2; th <= numberOfThreads; th *= 2) {
JCasHashMap.setDEFAULT_CONCURRENCY_LEVEL(th);
Arrays.fill(found, null);
m.clear();
// System.out.println("debug kickoffthreads");
MultiThreadUtils.kickOffThreads(threads);
// verify that one thread holds the lock, the others are waiting on that lock
int numberWaiting;
while (true) { // may take a while for threads to get going
Thread.sleep(5);
numberWaiting = 0;
int threadHoldingLock = -1;
for (int i = 0; i < numberOfThreads; i++) {
if (threads[i].utilBoolean.get()) {
threadHoldingLock = i;
} else {
numberWaiting ++;
}
}
if (threadHoldingLock != -1) {
break;
}
}
// System.out.println("debug thread holding lock is " + threadHoldingLock);
// all threads except one should be in synch lock wait
// one thread should be in wait in while loop.
assertEquals(numberOfThreads - 1, numberWaiting);
okToProceed.set(true);
// found[threadHoldingLock] = fs;
MultiThreadUtils.waitForAllReady(threads);
okToProceed.set(false);
// // loop a few times to give enough time for the other threads to finish.
// long startOfWait = System.currentTimeMillis();
// while (System.currentTimeMillis() - startOfWait < 30000) { // wait up to 30 seconds in case of machine stall
//
// // Attempt to insure we let the threads under test run in preference to this one
// Thread.sleep(20); // imprecise. Intent is to allow other thread that was waiting, to run
// // before this thread resumes. Depends on thread priorities, but
// // multiple threads could be running at the same time.
//
// numberWaiting = 0;
// for (int i = 0; i < numberOfThreads; i++) {
// if (threads[i].state == MultiThreadUtils.THREAD_RUNNING) {
// numberWaiting ++;
// }
// }
// if (numberWaiting == 0) {
// break;
// }
// }
// assertEquals(0, numberWaiting); // if not 0 by now, something is likely wrong, or machine stalled more than 30 seconds
// System.out.format("JCasHashMapTest collide, found = %s%n", intList(found));
for (TOP f : found) {
if (f != fs) {
System.err.format("JCasHashMapTest miscompare fs = %s, f = %s%n", fs, (f == null) ? "null" : f);
}
assertTrue(f == fs);
}
}
}
MultiThreadUtils.terminateThreads(threads);
}
// private void arun2(int n) {
// JCasHashMap2 m = new JCasHashMap2(200, true);
// assertTrue(m.size() == 0);
// assertTrue(m.getbitsMask() == 0x000000ff);
//
// JCas jcas = null;
//
// long start = System.currentTimeMillis();
// for (int i = 0; i < n; i++) {
// TOP fs = new TOP(7 * i, NULL_TOP_TYPE_INSTANCE);
// TOP v = m.get(fs.getAddress());
// if (null == v) {
// m.putAtLastProbeAddr(fs);
// }
// }
// System.out.format("time for v2 %,d is %,d ms%n",
// n, System.currentTimeMillis() - start);
// m.showHistogram();
//
// }
private void arun(int n) {
JCasHashMap m = new JCasHashMap(200); // true = do use cache
assertTrue(m.getApproximateSize() == 0);
long start = System.nanoTime();
for (int i = 0; i < n; i++) {
final int key = addrs[i];
m.putIfAbsent(key, k -> TOP._createSearchKey(k));
// TOP v = m.get(fs.getAddress());
// if (null == v) {
// m.get(7 * i);
// m.getReserve(key);
// m.put(fs);
// }
}
long stop = System.nanoTime();
assertEquals(m.getApproximateSize(), n);
System.out.format("time for v1 %,d is %,d microsecs%n",
n, (stop - start)/1000);
m.showHistogram();
}
private void arunCk(int n) {
JCasHashMap m = new JCasHashMap(200); // true = do use cache
for (int i = 0; i < n; i++) {
final int key = addrs[i];
m.putIfAbsent(key, k -> TOP._createSearchKey(k));
// TOP fs = TOP._createSearchKey(key);
// TOP v = m.get(fs.getAddress());
// if (null == v) {
// m.get(7 * i);
// m.findEmptySlot(key);
// m.getReserve(key);
// m.put(fs);
// }
}
for (int i = 0; i < n; i++) {
final int key = addrs[i];
TOP fs = (TOP) m.get(key);
// if (fs == null) { // for debugging
// System.out.println("debug stop");
// fail();
// }
assertTrue(null != fs);
}
}
private void innerTstGrowth() throws DataFormatException {
for (int th = 2; th <= 128; th *= 2) { // 2 4 8 16 32 64 128 256
JCasHashMap.setDEFAULT_CONCURRENCY_LEVEL(th);
double loadfactor = .6; // from JCasHashMap impl
final int sub_capacity = 32; // from JCasHashMap impl
int subs = th;
int agg_capacity = subs * sub_capacity;
JCasHashMap m = new JCasHashMap(agg_capacity); // true = do use cache
assertEquals(0, m.getApproximateSize());
assertEquals(agg_capacity, m.getCapacity());
int switchpoint = (int)Math.floor(agg_capacity * loadfactor);
fill(switchpoint, m);
System.out.print("JCasHashMapTest: after fill to switch point: ");
assertTrue(checkSubsCapacity(m, sub_capacity));
System.out.print("JCasHashMapTest: after 1 past switch point: ");
final int key = addrs[switchpoint + 1];
m.putIfAbsent(key, k -> TOP._createSearchKey(k));
// m.getReserve(key);
// m.put(TOP._createSearchKey(key));
assertTrue(checkSubsCapacity(m, sub_capacity));
m.clear();
System.out.print("JCasHashMapTest: after clear: ");
assertTrue(checkSubsCapacity(m, sub_capacity));
fill(switchpoint, m);
System.out.print("JCasHashMapTest: after fill to switch point: ");
assertTrue(checkSubsCapacity(m, sub_capacity));
final int key2 = addrs[switchpoint + 1];
// m.putIfAbsent(key, k -> TOP._createSearchKey(key2)); // k is ignored
m.putIfAbsent(key2, k -> TOP._createSearchKey(k));
// m.getReserve(key);
// m.put(TOP._createSearchKey(key));
System.out.print("JCasHashMapTest: after 1 past switch point: ");
assertTrue(checkSubsCapacity(m, sub_capacity));
m.clear(); // size is above switchpoint, so no shrinkage
System.out.print("JCasHashMapTest: after clear (size above sp: ");
assertTrue(checkSubsCapacity(m, sub_capacity));
m.clear(); // size is 0, so first time shrinkage a possibility
System.out.print("JCasHashMapTest: clear (size below sp: ");
assertTrue(checkSubsCapacity(m, sub_capacity)); // but we don't shrink on first time
m.clear();
System.out.print("JCasHashMapTest: clear (size below 2nd time: ");
assertTrue(checkSubsCapacity(m, sub_capacity, sub_capacity)); // but we do on second time
// m.clear();
// System.out.print("JCasHashMapTest: clear (size below 3rd time: ");
// assertTrue(checkSubsCapacity(m, sub_capacity, sub_capacity));
// m.clear();
// System.out.print("JCasHashMapTest: clear (size below 4th time: ");
// assertTrue(checkSubsCapacity(m, sub_capacity, sub_capacity)); // don't shrink below minimum
}
}
public void testGrowth() {
System.out.println("JCasHashMapTest growth");
while (true) { // loop for skew retry
boolean skewOk = true;
try { // catch for skew retry
innerTstGrowth();
} catch (DataFormatException e) { // hijacked this exception to avoid making a custom one
skewOk = false;
}
if (skewOk) break;
System.out.println("\n*******************\nJCasHashMapTest growth excessive skew retry\n*******************\n");
createAddrs(); // do a new set, hope it has less skew
}
}
private boolean checkSubsCapacity(JCasHashMap m, int v) {
return checkSubsCapacity(m, v, v * 2);
}
// check: the subMaps should be mostly of size v, but some might be of size v*2.
private boolean checkSubsCapacity(JCasHashMap m, int v, int v2) {
int[] caps = m.getCapacities();
for (int i : caps) {
if (i == v || i == v2 ) {
continue;
}
System.err.format("expected %d or %d, but got %s%n", v, v2, intList(caps));
return false;
}
System.out.format("%s%n", intListPm(caps, v));
return true;
}
private boolean isSkewed(JCasHashMap m, int n) throws DataFormatException {
int[] subsizes = m.getSubSizes();
int limit = n / subsizes.length; // for 128 entries, over 4 submaps, = 32 avg size
limit = limit * 2; // allow 4x
for (int i : subsizes) {
if (i >= limit) {
throw new DataFormatException();
}
}
return false;
}
private String intList(int[] a) {
StringBuilder sb = new StringBuilder();
for (int i : a) {
sb.append(i).append(", ");
}
return sb.toString();
}
private String intListPm(int[] a, int smaller) {
StringBuilder sb = new StringBuilder(a.length);
for (int i : a) {
sb.append(i == smaller ? '.' : '+');
}
return sb.toString();
}
private String intList(TOP[] a) {
StringBuilder sb = new StringBuilder();
for (TOP i : a) {
sb.append(i == null ? "null" : i._id()).append(", ");
}
return sb.toString();
}
private void fill (int n, JCasHashMap m) throws DataFormatException {
for (int i = 0; i < n; i++) {
final int key = addrs[i];
m.putIfAbsent(key, k -> TOP._createSearchKey(k));
//
// TOP fs = TOP._createSearchKey(key);
// m.getReserve(key);
// m.put(fs);
// System.out.format("JCasHashMapTest fill %s%n", intList(m.getCapacities()));
}
isSkewed(m, n); // throws if skewed to retry test from top
}
}