blob: 6667969cacbfc6965131ea7b8fd85a818e0233ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aries.typedevent.bus.impl;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.osgi.framework.Constants;
import org.osgi.service.typedevent.TypedEventConstants;
import org.osgi.service.typedevent.TypedEventHandler;
import org.osgi.service.typedevent.UnhandledEventHandler;
import org.osgi.service.typedevent.UntypedEventHandler;
import org.osgi.util.converter.Converters;
public class TypedEventBusImplTest {
private static final String SPECIAL_TEST_EVENT_TOPIC = SpecialTestEvent.class.getName().replace(".", "/");
private static final String TEST_EVENT_TOPIC = TestEvent.class.getName().replace(".", "/");
public static class TestEvent {
public String message;
}
public static class TestEvent2 {
public int count;
}
public static class SpecialTestEvent extends TestEvent {
}
@Mock(lenient = true)
TypedEventHandler<Object> handlerA, handlerB;
@Mock(lenient = true)
UntypedEventHandler untypedHandlerA, untypedHandlerB;
@Mock(lenient = true)
UnhandledEventHandler unhandledHandler;
Semaphore semA = new Semaphore(0), semB = new Semaphore(0), untypedSemA = new Semaphore(0),
untypedSemB = new Semaphore(0), unhandledSem = new Semaphore(0);
TypedEventBusImpl impl;
TypedEventMonitorImpl monitorImpl;
private AutoCloseable mocks;
@BeforeEach
public void start() {
mocks = MockitoAnnotations.openMocks(this);
Mockito.doAnswer(i -> {
semA.release();
return null;
}).when(handlerA).notify(Mockito.anyString(), Mockito.any());
Mockito.doAnswer(i -> {
semB.release();
return null;
}).when(handlerB).notify(Mockito.anyString(), Mockito.any());
Mockito.doAnswer(i -> {
untypedSemA.release();
return null;
}).when(untypedHandlerA).notifyUntyped(Mockito.anyString(), Mockito.any());
Mockito.doAnswer(i -> {
untypedSemB.release();
return null;
}).when(untypedHandlerB).notifyUntyped(Mockito.anyString(), Mockito.any());
Mockito.doAnswer(i -> {
unhandledSem.release();
return null;
}).when(unhandledHandler).notifyUnhandled(Mockito.anyString(), Mockito.any());
monitorImpl = new TypedEventMonitorImpl(new HashMap<String, Object>());
impl = new TypedEventBusImpl(monitorImpl, new HashMap<String, Object>());
impl.start();
}
@AfterEach
public void stop() throws Exception {
impl.stop();
monitorImpl.destroy();
mocks.close();
}
/**
* Tests that events are delivered to Smart Behaviours based on type
*
* @throws InterruptedException
*/
@Test
public void testEventSending() throws InterruptedException {
TestEvent event = new TestEvent();
event.message = "boo";
Map<String, Object> serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put(Constants.SERVICE_ID, 42L);
impl.addTypedEventHandler(handlerA, serviceProperties);
serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/"));
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent2.class.getName());
serviceProperties.put(Constants.SERVICE_ID, 43L);
impl.addTypedEventHandler(handlerB, serviceProperties);
serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put(Constants.SERVICE_ID, 44L);
impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/"));
serviceProperties.put(Constants.SERVICE_ID, 45L);
impl.addUntypedEventHandler(untypedHandlerB, serviceProperties);
impl.deliver(event);
assertTrue(semA.tryAcquire(1, TimeUnit.SECONDS));
Mockito.verify(handlerA).notify(Mockito.eq(TestEvent.class.getName().replace('.', '/')),
Mockito.argThat(isTestEventWithMessage("boo")));
assertFalse(semB.tryAcquire(1, TimeUnit.SECONDS));
assertTrue(untypedSemA.tryAcquire(1, TimeUnit.SECONDS));
Mockito.verify(untypedHandlerA).notifyUntyped(Mockito.eq(TestEvent.class.getName().replace('.', '/')),
Mockito.argThat(isUntypedTestEventWithMessage("boo")));
assertFalse(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
}
public static class TestEventHandler implements TypedEventHandler<TestEvent> {
@Override
public void notify(String topic, TestEvent event) {
// No op
}
}
public static interface TestEventHandlerIface extends TypedEventHandler<TestEvent> {
}
/**
* Tests that reified typedEventHandlers are properly processed
*
* @throws InterruptedException
*/
@Test
public void testGenericTypeInference() throws InterruptedException {
TypedEventHandler<TestEvent> handler = Mockito.spy(TestEventHandler.class);
TypedEventHandler<TestEvent> handler2 = Mockito.spy(TestEventHandler.class);
TypedEventHandler<TestEvent> handler3 = Mockito.mock(TestEventHandlerIface.class);
TestEvent event = new TestEvent();
event.message = "boo";
Map<String, Object> serviceProperties = new HashMap<>();
serviceProperties.put(Constants.SERVICE_ID, 42L);
impl.addTypedEventHandler(handler, serviceProperties);
serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, SpecialTestEvent.class.getName());
serviceProperties.put(Constants.SERVICE_ID, 43L);
impl.addTypedEventHandler(handler2, serviceProperties);
serviceProperties = new HashMap<>();
serviceProperties.put(Constants.SERVICE_ID, 44L);
impl.addTypedEventHandler(handler3, serviceProperties);
impl.deliver(event);
Mockito.verify(handler, Mockito.timeout(1000)).notify(eq(TEST_EVENT_TOPIC), argThat(isTestEventWithMessage("boo")));
Mockito.verify(handler3, Mockito.timeout(1000)).notify(eq(TEST_EVENT_TOPIC), argThat(isTestEventWithMessage("boo")));
Mockito.verify(handler2, Mockito.after(1000).never()).notify(Mockito.anyString(), Mockito.any());
event = new SpecialTestEvent();
event.message = "far";
impl.deliver(event);
Mockito.verify(handler, Mockito.after(1000).never()).notify(eq(SPECIAL_TEST_EVENT_TOPIC), Mockito.any());
Mockito.verify(handler3, Mockito.after(1000).never()).notify(eq(SPECIAL_TEST_EVENT_TOPIC), Mockito.any());
Mockito.verify(handler2, Mockito.timeout(1000)).notify(eq(SPECIAL_TEST_EVENT_TOPIC),
argThat(isSpecialTestEventWithMessage("far")));
}
/**
* Tests that events are delivered to Event Handlers based on type
*
* @throws InterruptedException
*/
@Test
public void testUntypedEventSending() throws InterruptedException {
TestEvent event = new TestEvent();
event.message = "boo";
Map<String, Object> serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName());
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put(Constants.SERVICE_ID, 42L);
impl.addTypedEventHandler(handlerA, serviceProperties);
serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName());
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent2.class.getName());
serviceProperties.put(Constants.SERVICE_ID, 43L);
impl.addTypedEventHandler(handlerB, serviceProperties);
serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName());
serviceProperties.put(Constants.SERVICE_ID, 44L);
impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName());
serviceProperties.put(Constants.SERVICE_ID, 45L);
impl.addUntypedEventHandler(untypedHandlerB, serviceProperties);
impl.deliver(event.getClass().getName(), Converters.standardConverter().convert(event).to(Map.class));
assertTrue(semA.tryAcquire(1, TimeUnit.SECONDS));
Mockito.verify(handlerA).notify(Mockito.eq(TestEvent.class.getName()),
Mockito.argThat(isTestEventWithMessage("boo")));
assertFalse(semB.tryAcquire(1, TimeUnit.SECONDS));
assertTrue(untypedSemA.tryAcquire(1, TimeUnit.SECONDS));
Mockito.verify(untypedHandlerA).notifyUntyped(Mockito.eq(TestEvent.class.getName()),
Mockito.argThat(isUntypedTestEventWithMessage("boo")));
assertFalse(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
}
/**
* Tests that filtering is applied to message sending/receiving
*
* @throws InterruptedException
*/
@Test
public void testEventFiltering() throws InterruptedException {
Map<String, Object> serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put("event.filter", "(message=foo)");
serviceProperties.put(Constants.SERVICE_ID, 42L);
impl.addTypedEventHandler(handlerA, serviceProperties);
serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put("event.filter", "(message=bar)");
serviceProperties.put(Constants.SERVICE_ID, 43L);
impl.addTypedEventHandler(handlerB, serviceProperties);
serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put("event.filter", "(message=foo)");
serviceProperties.put(Constants.SERVICE_ID, 44L);
impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put("event.filter", "(message=bar)");
serviceProperties.put(Constants.SERVICE_ID, 45L);
impl.addUntypedEventHandler(untypedHandlerB, serviceProperties);
TestEvent event = new TestEvent();
event.message = "foo";
impl.deliver(event);
assertTrue(semA.tryAcquire(1, TimeUnit.SECONDS));
Mockito.verify(handlerA).notify(Mockito.eq(TestEvent.class.getName().replace('.', '/')),
Mockito.argThat(isTestEventWithMessage("foo")));
assertFalse(semB.tryAcquire(1, TimeUnit.SECONDS));
assertTrue(untypedSemA.tryAcquire(1, TimeUnit.SECONDS));
Mockito.verify(untypedHandlerA).notifyUntyped(Mockito.eq(TestEvent.class.getName().replace('.', '/')),
Mockito.argThat(isUntypedTestEventWithMessage("foo")));
assertFalse(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
event = new TestEvent();
event.message = "bar";
impl.deliver(event);
assertTrue(semB.tryAcquire(1, TimeUnit.SECONDS));
Mockito.verify(handlerB).notify(Mockito.eq(TestEvent.class.getName().replace('.', '/')),
Mockito.argThat(isTestEventWithMessage("bar")));
assertFalse(semA.tryAcquire(1, TimeUnit.SECONDS));
assertTrue(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
Mockito.verify(untypedHandlerB).notifyUntyped(Mockito.eq(TestEvent.class.getName().replace('.', '/')),
Mockito.argThat(isUntypedTestEventWithMessage("bar")));
assertFalse(untypedSemA.tryAcquire(1, TimeUnit.SECONDS));
}
/**
* Tests that filtering is applied to message sending/receiving
*
* @throws InterruptedException
*/
@Test
public void testEventFilteringWithEmptyStringFilter() throws InterruptedException {
Map<String, Object> serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put("event.filter", "");
serviceProperties.put(Constants.SERVICE_ID, 42L);
impl.addTypedEventHandler(handlerA, serviceProperties);
TestEvent event = new TestEvent();
event.message = "foo";
impl.deliver(event);
assertTrue(semA.tryAcquire(1, TimeUnit.SECONDS));
}
/**
* Tests that the consumer of last resort gets called appropriately
*
* @throws InterruptedException
*/
@Test
public void testUnhandledEventHandlers() throws InterruptedException {
Map<String, Object> serviceProperties = new HashMap<>();
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put("event.filter", "(message=foo)");
serviceProperties.put(Constants.SERVICE_ID, 42L);
impl.addTypedEventHandler(handlerA, serviceProperties);
serviceProperties = new HashMap<>();
serviceProperties.put(Constants.SERVICE_ID, 45L);
impl.addUnhandledEventHandler(unhandledHandler, serviceProperties);
TestEvent event = new TestEvent();
event.message = "foo";
impl.deliver(event);
assertTrue(semA.tryAcquire(1, TimeUnit.SECONDS));
Mockito.verify(handlerA).notify(Mockito.eq(TestEvent.class.getName().replace('.', '/')),
Mockito.argThat(isTestEventWithMessage("foo")));
assertFalse(unhandledSem.tryAcquire(1, TimeUnit.SECONDS));
event = new TestEvent();
event.message = "bar";
impl.deliver(event);
assertTrue(unhandledSem.tryAcquire(1, TimeUnit.SECONDS));
Mockito.verify(unhandledHandler).notifyUnhandled(Mockito.eq(TestEvent.class.getName().replace('.', '/')),
Mockito.argThat(isUntypedTestEventWithMessage("bar")));
assertFalse(semA.tryAcquire(1, TimeUnit.SECONDS));
}
ArgumentMatcher<TestEvent> isTestEventWithMessage(String message) {
return new ArgumentMatcher<TestEvent>() {
@Override
public boolean matches(TestEvent argument) {
return argument instanceof TestEvent && message.equals(((TestEvent) argument).message);
}
};
}
ArgumentMatcher<SpecialTestEvent> isSpecialTestEventWithMessage(String message) {
return new ArgumentMatcher<SpecialTestEvent>() {
@Override
public boolean matches(SpecialTestEvent argument) {
return argument instanceof SpecialTestEvent && message.equals(((SpecialTestEvent) argument).message);
}
};
}
ArgumentMatcher<Map<String, Object>> isUntypedTestEventWithMessage(String message) {
return new ArgumentMatcher<Map<String, Object>>() {
@Override
public boolean matches(Map<String, Object> argument) {
return argument != null && message.equals(argument.get("message"));
}
};
}
}