blob: aaf77e0e9baedda4967a4b7c4581cdcb9cb20704 [file] [log] [blame]
package org.apache.aries.events.memory;
import static org.apache.aries.events.api.SubscribeRequest.to;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.initMocks;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.aries.events.api.Message;
import org.apache.aries.events.api.Messaging;
import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
import org.apache.aries.events.api.SubscribeRequest;
import org.apache.aries.events.api.Subscription;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
public class MessagingTest {
@Mock
private Consumer<Received> callback;
@Captor
private ArgumentCaptor<Received> messageCaptor;
private Set<Subscription> subscriptions = new HashSet<>();
private Messaging messaging;
@Before
public void before() {
initMocks(this);
messaging = new InMemoryMessaging();
}
@After
public void after() {
subscriptions.forEach(Subscription::close);
}
@Test
public void testPositionFromString() {
Position pos = messaging.positionFromString("1");
assertThat(pos.compareTo(new MemoryPosition(1)), equalTo(0));
assertThat(pos.positionToString(), equalTo("1"));
}
@Test
public void testSend() {
subscribe(to("test", callback).seek(Seek.earliest));
String content = "testcontent";
send("test", content);
verify(callback, timeout(1000)).accept(messageCaptor.capture());
Received received = messageCaptor.getValue();
assertThat(received.getMessage().getPayload(), equalTo(toBytes(content)));
assertEquals(0, received.getPosition().compareTo(new MemoryPosition(0)));
assertThat(received.getMessage().getProperties().size(), equalTo(1));
assertThat(received.getMessage().getProperties().get("my"), equalTo("testvalue"));
}
@Test(expected=NullPointerException.class)
public void testInvalidSubscribe() {
subscribe(to("test", callback).seek(null));
}
@Test
public void testExceptionInHandler() {
doThrow(new RuntimeException("Expected exception")).when(callback).accept(Mockito.any(Received.class));
subscribe(to("test", callback));
send("test", "testcontent");
verify(callback, timeout(1000)).accept(messageCaptor.capture());
}
@Test
public void testEarliestBefore() {
subscribe(to("test", callback).seek(Seek.earliest));
send("test", "testcontent");
send("test", "testcontent2");
verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
assertThat(messageContents(), contains("testcontent", "testcontent2"));
}
@Test
public void testEarliestAfter() {
send("test", "testcontent");
subscribe(to("test", callback).seek(Seek.earliest));
send("test", "testcontent2");
verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
assertThat(messageContents(), contains("testcontent", "testcontent2"));
}
@Test
public void testLatestBefore() {
subscribe(to("test", callback));
send("test", "testcontent");
send("test", "testcontent2");
verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
assertThat(messageContents(), contains("testcontent", "testcontent2"));
}
@Test
public void testLatest() {
send("test", "testcontent");
subscribe(to("test", callback));
send("test", "testcontent2");
verify(callback, timeout(1000)).accept(messageCaptor.capture());
assertThat(messageContents(), contains("testcontent2"));
}
@Test
public void testFrom1() {
send("test", "testcontent");
send("test", "testcontent2");
subscribe(to("test", callback).startAt(new MemoryPosition(1l)).seek(Seek.earliest));
verify(callback, timeout(1000)).accept(messageCaptor.capture());
assertThat(messageContents(), contains("testcontent2"));
}
private void subscribe(SubscribeRequest request) {
this.subscriptions.add(messaging.subscribe(request));
}
private List<String> messageContents() {
return messageCaptor.getAllValues().stream()
.map(this::getContent).collect(Collectors.toList());
}
private String getContent(Received rec) {
return new String(rec.getMessage().getPayload(), Charset.forName("UTF-8"));
}
private void send(String topic, String content) {
Map<String, String> props = new HashMap<String, String>();
props.put("my", "testvalue");
Message message = messaging.newMessage(toBytes(content), props);
messaging.send(topic, message);
}
private byte[] toBytes(String content) {
return content.getBytes(Charset.forName("UTF-8"));
}
}