blob: 61bdcececf2e66a5760038a437d6c8f8a037b338 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.server.ServerLoad;
import org.apache.geode.test.junit.categories.MembershipTest;
/**
* Integration tests extracted from LocatorLoadSnapshotJUnitTest
*/
@Category({MembershipTest.class})
public class LocatorLoadSnapshotIntegrationTest {
/**
* A basic test of concurrent functionality. Starts a number of threads making requests and
* expects the load to be balanced between three servers.
*
*/
@Test
public void testConcurrentBalancing() throws InterruptedException {
int NUM_THREADS = 50;
final int NUM_REQUESTS = 10000;
int ALLOWED_THRESHOLD = 50; // We should never be off by more than
// the number of concurrent threads.
final LocatorLoadSnapshot sn = new LocatorLoadSnapshot();
final ServerLocation l1 = new ServerLocation("localhost", 1);
final ServerLocation l2 = new ServerLocation("localhost", 2);
final ServerLocation l3 = new ServerLocation("localhost", 3);
int initialLoad1 = (int) (Math.random() * (NUM_REQUESTS / 2));
int initialLoad2 = (int) (Math.random() * (NUM_REQUESTS / 2));
int initialLoad3 = (int) (Math.random() * (NUM_REQUESTS / 2));
sn.addServer(l1, new String[0], new ServerLoad(initialLoad1, 1, 0, 1));
sn.addServer(l2, new String[0], new ServerLoad(initialLoad2, 1, 0, 1));
sn.addServer(l3, new String[0], new ServerLoad(initialLoad3, 1, 0, 1));
final Map loadCounts = new HashMap();
loadCounts.put(l1, new AtomicInteger(initialLoad1));
loadCounts.put(l2, new AtomicInteger(initialLoad2));
loadCounts.put(l3, new AtomicInteger(initialLoad3));
Thread[] threads = new Thread[NUM_THREADS];
// final Object lock = new Object();
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new Thread("Thread-" + i) {
@Override
public void run() {
for (int ii = 0; ii < NUM_REQUESTS; ii++) {
ServerLocation location;
// synchronized(lock) {
location = sn.getServerForConnection(null, Collections.EMPTY_SET);
// }
AtomicInteger count = (AtomicInteger) loadCounts.get(location);
count.incrementAndGet();
}
}
};
}
for (int i = 0; i < NUM_THREADS; i++) {
threads[i].start();
}
for (int i = 0; i < NUM_THREADS; i++) {
Thread t = threads[i];
long ms = 30 * 1000;
t.join(30 * 1000);
if (t.isAlive()) {
for (int j = 0; j < NUM_THREADS; j++) {
threads[j].interrupt();
}
fail("Thread did not terminate after " + ms + " ms: " + t);
}
}
double expectedPerServer =
(initialLoad1 + initialLoad2 + initialLoad3 + NUM_REQUESTS * NUM_THREADS)
/ (double) loadCounts.size();
// for(Iterator itr = loadCounts.entrySet().iterator(); itr.hasNext(); ) {
// Map.Entry entry = (Entry) itr.next();
// ServerLocation location = (ServerLocation) entry.getKey();
// AI count= (AI) entry.getValue();
// }
for (Iterator itr = loadCounts.entrySet().iterator(); itr.hasNext();) {
Map.Entry entry = (Map.Entry) itr.next();
ServerLocation location = (ServerLocation) entry.getKey();
AtomicInteger count = (AtomicInteger) entry.getValue();
int difference = (int) Math.abs(count.get() - expectedPerServer);
assertTrue("Count " + count + " for server " + location + " is not within "
+ ALLOWED_THRESHOLD + " of " + expectedPerServer, difference < ALLOWED_THRESHOLD);
}
}
}