blob: 36b34198f17f9af1ccd79561e8e08bd00c660524 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.agent;
import static org.junit.Assert.*;
import java.util.List;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestActionQueue {
private static Logger LOG = LoggerFactory.getLogger(TestActionQueue.class);
private static int threadCount = 100;
static class ActionQueueOperation implements Runnable {
enum OpType {
ENQUEUE,
DEQUEUE,
DEQUEUEALL
}
private volatile boolean shouldRun = true;
private long [] opCounts;
private ActionQueue actionQueue;
private OpType operation;
private String[] hosts;
public ActionQueueOperation(ActionQueue aq, String [] hosts, OpType op) {
this.actionQueue = aq;
this.operation = op;
this.hosts = hosts;
opCounts = new long [hosts.length];
for (int i = 0; i < hosts.length; i++) {
opCounts[i] = 0;
}
}
public long [] getOpCounts() {
return opCounts;
}
public void stop() {
this.shouldRun = false;
}
@Override
public void run() {
try {
switch (operation) {
case ENQUEUE:
enqueueOp();
break;
case DEQUEUE:
dequeueOp();
break;
case DEQUEUEALL:
dequeueAllOp();
break;
}
} catch (Exception ex) {
LOG.error("Failure", ex);
throw new RuntimeException("Failure", ex);
}
}
private void enqueueOp() throws InterruptedException {
while (shouldRun) {
int index = 0;
for (String host: hosts) {
actionQueue.enqueue(host, new StatusCommand());
opCounts[index]++;
index++;
}
Thread.sleep(1);
}
}
private void dequeueOp() throws InterruptedException {
while (shouldRun) {
int index = 0;
for (String host: hosts) {
AgentCommand cmd = actionQueue.dequeue(host);
if (cmd != null) {
opCounts[index]++;
}
index++;
}
Thread.sleep(1);
}
}
private void dequeueAllOp() throws InterruptedException {
while (shouldRun) {
int index = 0;
for (String host : hosts) {
List<AgentCommand> cmds = actionQueue.dequeueAll(host);
if (cmds != null && !cmds.isEmpty()) {
opCounts[index] += cmds.size();
}
index++;
}
Thread.sleep(1);
}
}
}
@Test
public void testConcurrentOperations() throws InterruptedException {
ActionQueue aq = new ActionQueue();
String[] hosts = new String[] { "h0", "h1", "h2", "h3", "h4", "h5", "h6",
"h7", "h8", "h9" };
ActionQueueOperation[] enqueOperators = new ActionQueueOperation[threadCount];
ActionQueueOperation[] dequeOperators = new ActionQueueOperation[threadCount];
ActionQueueOperation[] dequeAllOperators = new ActionQueueOperation[threadCount];
for (int i = 0; i < threadCount; i++) {
dequeOperators[i] = new ActionQueueOperation(aq, hosts,
ActionQueueOperation.OpType.DEQUEUE);
Thread t = new Thread(dequeOperators[i]);
t.start();
}
for (int i = 0; i < threadCount; i++) {
enqueOperators[i] = new ActionQueueOperation(aq, hosts,
ActionQueueOperation.OpType.ENQUEUE);
Thread t = new Thread(enqueOperators[i]);
t.start();
}
for (int i = 0; i < threadCount; i++) {
dequeAllOperators[i] = new ActionQueueOperation(aq, hosts,
ActionQueueOperation.OpType.DEQUEUEALL);
Thread t = new Thread(dequeAllOperators[i]);
t.start();
}
// Run for some time
Thread.sleep(100);
// Stop the enqueue
for (int i = 0; i < threadCount; i++) {
enqueOperators[i].stop();
}
// Give time to get everything dequeued
boolean allDequeued = false;
while (!allDequeued) {
Thread.sleep(10);
allDequeued = true;
for (String host: hosts) {
if (aq.size(host) > 0) {
allDequeued = false;
break;
}
}
}
// Stop all threads
for (int i = 0; i < threadCount; i++) {
dequeOperators[i].stop();
dequeAllOperators[i].stop();
}
for (int h = 0; h<hosts.length; h++) {
long opsEnqueued = 0;
long opsDequeued = 0;
for (int i = 0; i < threadCount; i++) {
opsEnqueued += enqueOperators[i].getOpCounts()[h];
opsDequeued += dequeOperators[i].getOpCounts()[h];
opsDequeued += dequeAllOperators[i].getOpCounts()[h];
}
assertTrue(opsEnqueued != 0); //Prevent degenerate case of all zeros.
assertEquals(0, aq.size(hosts[h])); //Everything should be dequeued
LOG.info("Host: " + hosts[h] + ", opsEnqueued: " + opsEnqueued
+ ", opsDequeued: " + opsDequeued);
assertEquals(opsDequeued, opsEnqueued);
}
}
}