blob: 13fd4b57b240ed3723a22232c337f61626b6a91a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.network.security;
import com.cloud.utils.Profiler;
import junit.framework.TestCase;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class SecurityGroupQueueTest extends TestCase {
public final static SecurityGroupWorkQueue queue = new LocalSecurityGroupWorkQueue();
public static class Producer implements Runnable {
int _maxVmId = 0;
int _newWorkQueued = 0;
Set<Long> vmIds = new HashSet<Long>();
public Producer(int maxVmId) {
this._maxVmId = maxVmId;
for (long i = 1; i <= _maxVmId; i++) {
vmIds.add(i);
}
}
@Override
public void run() {
_newWorkQueued = queue.submitWorkForVms(vmIds);
}
public int getNewWork() {
return _newWorkQueued;
}
public int getTotalWork() {
return _maxVmId;
}
}
public static class Consumer implements Runnable {
private int _numJobsToDequeue = 0;
private int _numJobsDequeued = 0;
public Consumer(int numJobsToDequeu) {
this._numJobsToDequeue = numJobsToDequeu;
}
@Override
public void run() {
List<SecurityGroupWork> result = new ArrayList<SecurityGroupWork>();
try {
result = queue.getWork(_numJobsToDequeue);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this._numJobsDequeued = result.size();
}
int getNumJobsToDequeue() {
return _numJobsToDequeue;
}
int getNumJobsDequeued() {
return _numJobsDequeued;
}
}
public void testNumJobsEqToNumVms1() {
queue.clear();
final int numProducers = 50;
Thread[] pThreads = new Thread[numProducers];
Producer[] producers = new Producer[numProducers];
int numProduced = 0;
for (int i = 0; i < numProducers; i++) {
producers[i] = new Producer(i + 1);
pThreads[i] = new Thread(producers[i]);
numProduced += i + 1;
pThreads[i].start();
}
for (int i = 0; i < numProducers; i++) {
try {
pThreads[i].join();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
System.out.println("Num Vms= " + numProducers + " Queue size = " + queue.size());
assertEquals(numProducers, queue.size());
}
protected void testNumJobsEqToNumVms2(int numProducers, int maxVmId) {
queue.clear();
Thread[] pThreads = new Thread[numProducers];
Producer[] producers = new Producer[numProducers];
int numProduced = 0;
Profiler p = new Profiler();
p.start();
for (int i = 0; i < numProducers; i++) {
producers[i] = new Producer(maxVmId);
pThreads[i] = new Thread(producers[i]);
numProduced += i + 1;
pThreads[i].start();
}
for (int i = 0; i < numProducers; i++) {
try {
pThreads[i].join();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
p.stop();
System.out.println("Num Vms= " + maxVmId + " Queue size = " + queue.size() + " time=" + p.getDurationInMillis() + " ms");
assertEquals(maxVmId, queue.size());
}
public void testNumJobsEqToNumVms3() {
testNumJobsEqToNumVms2(50, 20000);
testNumJobsEqToNumVms2(400, 5000);
testNumJobsEqToNumVms2(1, 1);
testNumJobsEqToNumVms2(1, 1000000);
testNumJobsEqToNumVms2(750, 1);
}
protected void _testDequeueOneJob(final int numConsumers, final int numProducers, final int maxVmId) {
queue.clear();
Thread[] pThreads = new Thread[numProducers];
Thread[] cThreads = new Thread[numConsumers];
Consumer[] consumers = new Consumer[numConsumers];
Producer[] producers = new Producer[numProducers];
int numProduced = 0;
for (int i = 0; i < numConsumers; i++) {
consumers[i] = new Consumer(1);
cThreads[i] = new Thread(consumers[i]);
cThreads[i].start();
}
for (int i = 0; i < numProducers; i++) {
producers[i] = new Producer(maxVmId);
pThreads[i] = new Thread(producers[i]);
numProduced += maxVmId;
pThreads[i].start();
}
for (int i = 0; i < numConsumers; i++) {
try {
cThreads[i].join();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
for (int i = 0; i < numProducers; i++) {
try {
pThreads[i].join();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
int totalDequeued = 0;
for (int i = 0; i < numConsumers; i++) {
//System.out.println("Consumer " + i + " ask to dequeue " + consumers[i].getNumJobsToDequeue() + ", dequeued " + consumers[i].getNumJobsDequeued());
totalDequeued += consumers[i].getNumJobsDequeued();
}
int totalQueued = 0;
for (int i = 0; i < numProducers; i++) {
//System.out.println("Producer " + i + " ask to queue " + producers[i].getTotalWork() + ", queued " + producers[i].getNewWork());
totalQueued += producers[i].getNewWork();
}
System.out.println("Total jobs dequeued = " + totalDequeued + ", num queued=" + totalQueued + " queue current size=" + queue.size());
assertEquals(totalDequeued, numConsumers);
assertEquals(totalQueued - totalDequeued, queue.size());
}
public void testDequeueOneJobAgain() {
_testDequeueOneJob(10, 10, 1000);
int queueSize = queue.size();
Thread cThread = new Thread(new Consumer(1));
cThread.start();
try {
cThread.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
assertEquals(queue.size(), queueSize - 1);
}
public void testDequeueOneJob() {
_testDequeueOneJob(10, 10, 1000);
_testDequeueOneJob(1, 10, 1000);
_testDequeueOneJob(10, 1, 1000);
_testDequeueOneJob(10, 1, 10);
}
}