blob: af541086a37bcdfd8ef6d32282c2f098442b9cca [file] [log] [blame]
package brooklyn.management.internal;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import brooklyn.entity.Entity;
import brooklyn.entity.basic.BasicGroup;
import brooklyn.entity.basic.Entities;
import brooklyn.event.SensorEvent;
import brooklyn.event.SensorEventListener;
import brooklyn.management.SubscriptionHandle;
import brooklyn.management.SubscriptionManager;
import brooklyn.test.entity.TestApplication;
import brooklyn.test.entity.TestEntity;
/**
* testing the {@link SubscriptionManager} and associated classes.
*/
public class LocalSubscriptionManagerTest {
private static final int TIMEOUT_MS = 5000;
private TestApplication app;
private TestEntity entity;
@BeforeMethod(alwaysRun=true)
public void setup() {
app = new TestApplication();
entity = new TestEntity(app);
Entities.startManagement(app);
}
private void manage(Entity ...entities) {
for (Entity e: entities)
Entities.manage(e);
}
@Test
public void testSubscribeToEntityAttributeChange() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
app.subscribe(entity, TestEntity.SEQUENCE, new SensorEventListener<Object>() {
@Override public void onEvent(SensorEvent<Object> event) {
latch.countDown();
}});
entity.setSequenceValue(1234);
if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
fail("Timeout waiting for Event on TestEntity listener");
}
}
@Test
public void testSubscribeToEntityWithAttributeWildcard() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
app.subscribe(entity, null, new SensorEventListener<Object>() {
@Override public void onEvent(SensorEvent<Object> event) {
latch.countDown();
}});
entity.setSequenceValue(1234);
if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
fail("Timeout waiting for Event on TestEntity listener");
}
}
@Test
public void testSubscribeToAttributeChangeWithEntityWildcard() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
app.subscribe(null, TestEntity.SEQUENCE, new SensorEventListener<Object>() {
@Override public void onEvent(SensorEvent<Object> event) {
latch.countDown();
}});
entity.setSequenceValue(1234);
if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
fail("Timeout waiting for Event on TestEntity listener");
}
}
@Test
public void testSubscribeToChildAttributeChange() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
app.subscribeToChildren(app, TestEntity.SEQUENCE, new SensorEventListener<Object>() {
@Override public void onEvent(SensorEvent<Object> event) {
latch.countDown();
}});
entity.setSequenceValue(1234);
if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
fail("Timeout waiting for Event on child TestEntity listener");
}
}
@Test
public void testSubscribeToMemberAttributeChange() throws Exception {
BasicGroup group = new BasicGroup(app);
TestEntity member = new TestEntity(app);
manage(group, member);
group.addMember(member);
final List<SensorEvent<Integer>> events = new CopyOnWriteArrayList<SensorEvent<Integer>>();
final CountDownLatch latch = new CountDownLatch(1);
app.subscribeToMembers(group, TestEntity.SEQUENCE, new SensorEventListener<Integer>() {
@Override public void onEvent(SensorEvent<Integer> event) {
events.add(event);
latch.countDown();
}});
member.setAttribute(TestEntity.SEQUENCE, 123);
if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
fail("Timeout waiting for Event on parent TestEntity listener");
}
assertEquals(events.size(), 1);
assertEquals(events.get(0).getValue(), (Integer)123);
assertEquals(events.get(0).getSensor(), TestEntity.SEQUENCE);
assertEquals(events.get(0).getSource().getId(), member.getId());
}
// Regression test for ConcurrentModificationException in issue #327
@Test(groups="Integration")
public void testConcurrentSubscribingAndPublishing() throws Exception {
final AtomicReference<Exception> threadException = new AtomicReference<Exception>();
TestEntity entity = new TestEntity(app);
// Repeatedly subscribe and unsubscribe, so listener-set constantly changing while publishing to it.
// First create a stable listener so it is always the same listener-set object.
Thread thread = new Thread() {
public void run() {
try {
SensorEventListener<Object> noopListener = new SensorEventListener<Object>() {
@Override public void onEvent(SensorEvent<Object> event) {
}
};
app.getSubscriptionContext().subscribe(null, TestEntity.SEQUENCE, noopListener);
while (!Thread.currentThread().isInterrupted()) {
SubscriptionHandle handle = app.getSubscriptionContext().subscribe(null, TestEntity.SEQUENCE, noopListener);
app.getSubscriptionContext().unsubscribe(handle);
}
} catch (Exception e) {
threadException.set(e);
}
}
};
try {
thread.start();
for (int i = 0; i < 10000; i++) {
entity.setAttribute(TestEntity.SEQUENCE, i);
}
} finally {
thread.interrupt();
}
if (threadException.get() != null) throw threadException.get();
}
}