Allow to specify a type to deliver / consume in the API
diff --git a/pom.xml b/pom.xml
index b6359d4..901ef1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,6 +7,9 @@
<packaging>pom</packaging>
<properties>
+ <reactor.version>3.0.7.RELEASE</reactor.version>
+ <jetty.version>8.1.15.v20140411</jetty.version>
+ <cxf.version>3.2.0-SNAPSHOT</cxf.version>
</properties>
<modules>
diff --git a/rcomp-api/src/main/java/component/api/MComponent.java b/rcomp-api/src/main/java/component/api/MComponent.java
index 67d0ea8..2573aeb 100644
--- a/rcomp-api/src/main/java/component/api/MComponent.java
+++ b/rcomp-api/src/main/java/component/api/MComponent.java
@@ -3,7 +3,7 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
-public interface MComponent<T> {
- Publisher<T> from(String destination);
- Subscriber<T> to(String destination);
+public interface MComponent {
+ <S> Publisher<S> from(String destination, Class<? extends S> type);
+ <S> Subscriber<S> to(String destination, Class<? extends S> type);
}
diff --git a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java
index 5b264a0..f3506f7 100644
--- a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java
+++ b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminComponent.java
@@ -1,7 +1,5 @@
package component.eventadmin;
-import java.util.Map;
-
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -14,7 +12,7 @@
import component.api.MComponent;
@Component(property="name=eventAdmin")
-public class EventAdminComponent implements MComponent<Map<String, ?>> {
+public class EventAdminComponent implements MComponent {
private BundleContext context;
@@ -32,13 +30,13 @@
}
@Override
- public Publisher<Map<String, ?>> from(String topic) {
- return new EventAdminSource(context, topic);
+ public <T> Publisher<T> from(String topic, Class<? extends T> type) {
+ return new EventAdminSource<T>(context, topic, type);
}
@Override
- public Subscriber<Map<String, ?>> to(String topic) {
- return new EventAdminDestination(client, topic);
+ public <T> Subscriber<T> to(String topic, Class<? extends T> type) {
+ return new EventAdminDestination<T>(client, topic, type);
}
}
diff --git a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java
index 0e7f667..9c58201 100644
--- a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java
+++ b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminDestination.java
@@ -7,15 +7,18 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-public class EventAdminDestination implements Subscriber<Map<String, ?>> {
+public class EventAdminDestination<T> implements Subscriber<T> {
private EventAdmin client;
private String topic;
private Subscription subscription;
- public EventAdminDestination(EventAdmin client, String topic) {
+ public EventAdminDestination(EventAdmin client, String topic, Class<? extends T> type) {
this.client = client;
this.topic = topic;
+ if (! type.equals(Map.class)) {
+ throw new IllegalArgumentException("Curently only Map<String, ?> is supported");
+ }
}
@Override
@@ -25,9 +28,9 @@
}
@Override
- public void onNext(Map<String, ?> payload) {
+ public void onNext(T payload) {
try {
- Event event = new Event(topic, payload);
+ Event event = new Event(topic, convertTo(payload));
this.client.sendEvent(event);
} catch (Exception e) {
throw new RuntimeException(e);
@@ -36,6 +39,11 @@
}
}
+ @SuppressWarnings("unchecked")
+ private Map<String, ?> convertTo(T payload) {
+ return (Map<String, ?>) payload;
+ }
+
@Override
public void onError(Throwable t) {
}
diff --git a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java
index bbb560b..0354f75 100644
--- a/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java
+++ b/rcomp-eventadmin/src/main/java/component/eventadmin/EventAdminSource.java
@@ -15,26 +15,29 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-public class EventAdminSource implements Publisher<Map<String, ?>> {
+public class EventAdminSource<T> implements Publisher<T> {
private BundleContext context;
private String topic;
- public EventAdminSource(BundleContext context, String topic) {
+ public EventAdminSource(BundleContext context, String topic, Class<? extends T> type) {
this.context = context;
this.topic = topic;
+ if (! type.equals(Map.class)) {
+ throw new IllegalArgumentException("Curently only Map<String, ?> is supported");
+ }
}
@Override
- public void subscribe(Subscriber<? super Map<String, ?>> subscriber) {
+ public void subscribe(Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new EventAdminSubscription(subscriber));
}
public class EventAdminSubscription implements Subscription, EventHandler {
private AtomicBoolean subScribed;
private ServiceRegistration<EventHandler> sreg;
- private Subscriber<? super Map<String, ?>> subscriber;
+ private Subscriber<? super T> subscriber;
- public EventAdminSubscription(Subscriber<? super Map<String, ?>> subscriber) {
+ public EventAdminSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
this.subScribed = new AtomicBoolean(false);
}
@@ -61,7 +64,12 @@
@Override
public void handleEvent(Event event) {
- this.subscriber.onNext(toMap(event));
+ this.subscriber.onNext(convertTo(toMap(event)));
+ }
+
+ @SuppressWarnings("unchecked")
+ private T convertTo(Map<String, ?> map) {
+ return (T) map;
}
Map<String, ?> toMap(Event event) {
diff --git a/rcomp-examples/pom.xml b/rcomp-examples/pom.xml
index f726a2c..a45daa5 100644
--- a/rcomp-examples/pom.xml
+++ b/rcomp-examples/pom.xml
@@ -24,28 +24,30 @@
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
- <version>3.0.7.RELEASE</version>
+ <version>${reactor.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-adapter</artifactId>
- <version>3.0.7.RELEASE</version>
+ <version>${reactor.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-client</artifactId>
- <version>3.2.0-SNAPSHOT</version>
+ <version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>8.1.15.v20140411</version>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
- <version>8.1.15.v20140411</version>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
</dependency>
</dependencies>
diff --git a/rcomp-examples/src/main/java/reactortest/EventAdminExample.java b/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
index f4e490d..2417f0b 100644
--- a/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
+++ b/rcomp-examples/src/main/java/reactortest/EventAdminExample.java
@@ -18,12 +18,12 @@
Logger LOG = LoggerFactory.getLogger(EventAdminExample.class);
@Reference(target="(name=eventAdmin)")
- MComponent<Map<String, ? >> eventAdmin;
+ MComponent eventAdmin;
@Activate
public void start() throws Exception {
- Publisher<Map<String, ?>> fromTopic = eventAdmin.from("input");
- Subscriber<Map<String, ?>> toTopic = eventAdmin.to("output");
+ Publisher<Map<String, ?>> fromTopic = eventAdmin.from("input", Map.class);
+ Subscriber<Map<String, ?>> toTopic = eventAdmin.to("output", Map.class);
Flux.from(fromTopic)
.log()
.subscribe(toTopic);
diff --git a/rcomp-examples/src/main/java/reactortest/MqttExample.java b/rcomp-examples/src/main/java/reactortest/MqttExample.java
index feb214e..1c37247 100644
--- a/rcomp-examples/src/main/java/reactortest/MqttExample.java
+++ b/rcomp-examples/src/main/java/reactortest/MqttExample.java
@@ -17,13 +17,13 @@
Logger LOG = LoggerFactory.getLogger(MqttExample.class);
@Reference(target="(name=mqtt)")
- MComponent<byte[]> mqtt;
+ MComponent mqtt;
@Activate
public void start() throws Exception {
LOG.info("Starting mqtt test component");
- Publisher<byte[]> fromTopic = mqtt.from("input");
- Subscriber<byte[]> toTopic = mqtt.to("output");
+ Publisher<byte[]> fromTopic = mqtt.from("input", byte[].class);
+ Subscriber<byte[]> toTopic = mqtt.to("output", byte[].class);
Flux.from(fromTopic)
.map(ByteArrayConverter::asInteger)
.log()
diff --git a/rcomp-examples/src/test/java/examples/TestRs.java b/rcomp-examples/src/test/java/examples/TestRs.java
index 96aa543..feef726 100644
--- a/rcomp-examples/src/test/java/examples/TestRs.java
+++ b/rcomp-examples/src/test/java/examples/TestRs.java
@@ -52,6 +52,7 @@
ServletHandler handler = new ServletHandler();
Servlet servlet = new HttpServlet() {
+ private static final long serialVersionUID = 1L;
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
diff --git a/rcomp-examples/src/test/java/examples/TestServlet.java b/rcomp-examples/src/test/java/examples/TestServlet.java
new file mode 100644
index 0000000..b945985
--- /dev/null
+++ b/rcomp-examples/src/test/java/examples/TestServlet.java
@@ -0,0 +1,72 @@
+package examples;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.Assert;
+import org.junit.Test;
+
+import reactor.core.publisher.Mono;
+
+public class TestServlet {
+
+ @Test
+ public void testServlet() throws Exception {
+ Server server = createServer();
+ server.start();
+ WebTarget client = ClientBuilder.newClient().target("http://localhost:8384/Hello");
+ String result = client.request().get(String.class);
+ Assert.assertEquals("/Hello", result);
+ server.stop();
+ }
+
+ private final class TestAsyncServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ final AsyncContext ctx = req.startAsync(req, resp);
+ Mono.just(req.getServletPath())
+ .delayElement(Duration.ofMillis(2000))
+ .doOnSuccess(result -> writeResult(ctx, result))
+ .log()
+ .subscribe();
+ System.out.println("Returning from servlet. Request is handled asynchronously");
+ }
+
+ private void writeResult(AsyncContext ctx, String result) {
+ try {
+ ctx.getResponse().getWriter().append(result);
+ ctx.complete();
+ } catch (IOException e) {
+
+ }
+ }
+ }
+
+
+ private Server createServer() {
+ Server server = new Server();
+ SelectChannelConnector connector = new SelectChannelConnector();
+ connector.setPort(8384);
+ server.addConnector(connector);
+
+ ServletContextHandler handler = new ServletContextHandler();
+ handler.setContextPath("/");
+ handler.addServlet(new ServletHolder(new TestAsyncServlet()), "/");
+ server.setHandler(handler);
+ return server;
+ }
+}
diff --git a/rcomp-mail/src/main/java/component/mail/MailComponent.java b/rcomp-mail/src/main/java/component/mail/MailComponent.java
index 81986c2..62ea322 100644
--- a/rcomp-mail/src/main/java/component/mail/MailComponent.java
+++ b/rcomp-mail/src/main/java/component/mail/MailComponent.java
@@ -1,7 +1,6 @@
package component.mail;
import javax.mail.Session;
-import javax.mail.internet.MimeMessage;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
@@ -11,19 +10,19 @@
import component.api.MComponent;
@Component(property="name=mail")
-public class MailComponent implements MComponent<MimeMessage> {
+public class MailComponent implements MComponent {
@Reference
Session session;
@Override
- public Publisher<MimeMessage> from(String topic) {
+ public <T> Publisher<T> from(String topic, Class<? extends T> type) {
throw new RuntimeException();
}
@Override
- public Subscriber<MimeMessage> to(String destination) {
- return new MailDestination(destination);
+ public <T> Subscriber<T> to(String destination, Class<? extends T> type) {
+ return new MailDestination<T>(destination, type);
}
}
diff --git a/rcomp-mail/src/main/java/component/mail/MailDestination.java b/rcomp-mail/src/main/java/component/mail/MailDestination.java
index d9ebf27..15d38c6 100644
--- a/rcomp-mail/src/main/java/component/mail/MailDestination.java
+++ b/rcomp-mail/src/main/java/component/mail/MailDestination.java
@@ -1,6 +1,7 @@
package component.mail;
import javax.mail.Address;
+import javax.mail.Message;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
@@ -8,13 +9,16 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-public class MailDestination implements Subscriber<MimeMessage> {
+public class MailDestination<T> implements Subscriber<T> {
private String destination;
private Subscription subscription;
- public MailDestination(String destination) {
+ public MailDestination(String destination, Class<? extends T> type) {
this.destination = destination;
+ if (! type.equals(MimeMessage.class)) {
+ throw new IllegalArgumentException("Curently only MimeMessage is supported");
+ }
}
@Override
@@ -24,10 +28,10 @@
}
@Override
- public void onNext(MimeMessage message) {
+ public void onNext(T message) {
try {
Address[] addresses = new Address[]{new InternetAddress(destination)};
- Transport.send(message, addresses);
+ Transport.send(convertTo(message), addresses);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
@@ -35,6 +39,10 @@
}
}
+ private Message convertTo(T message) {
+ return (Message) message;
+ }
+
@Override
public void onError(Throwable t) {
}
diff --git a/rcomp-mail/src/test/java/component/mail/TestMail.java b/rcomp-mail/src/test/java/component/mail/TestMail.java
index d27e4c0..dbc6a99 100644
--- a/rcomp-mail/src/test/java/component/mail/TestMail.java
+++ b/rcomp-mail/src/test/java/component/mail/TestMail.java
@@ -25,7 +25,7 @@
MailComponent mail = new MailComponent();
mail.session = session;
- Subscriber<MimeMessage> to = mail.to("cschneider@localhost");
+ Subscriber<MimeMessage> to = mail.to("cschneider@localhost", MimeMessage.class);
Flux.just("Test").map(txt -> createMessage(session, txt)).subscribe(to);
}
diff --git a/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
index f35f492..352475f 100644
--- a/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttComponent.java
@@ -12,7 +12,7 @@
import component.api.MComponent;
@Component(property="name=mqtt")
-public class MqttComponent implements MComponent<byte[]> {
+public class MqttComponent implements MComponent {
MqttClient client;
@@ -35,13 +35,13 @@
}
@Override
- public Publisher<byte[]> from(String topic) {
- return new MqttSource(client, topic);
+ public <T> Publisher<T> from(String topic, Class<? extends T> type) {
+ return new MqttSource<T>(client, topic, type);
}
@Override
- public Subscriber<byte[]> to(String topic) {
- return new MqttDestination(client, topic);
+ public <T> Subscriber<T> to(String topic, Class<? extends T> type) {
+ return new MqttDestination<T>(client, topic, type);
}
}
diff --git a/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
index daa0c57..52236cd 100644
--- a/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttDestination.java
@@ -5,15 +5,18 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-public class MqttDestination implements Subscriber<byte[]> {
+public class MqttDestination<T> implements Subscriber<T> {
private MqttClient client;
private String topic;
private Subscription subscription;
- public MqttDestination(MqttClient client, String topic) {
+ public MqttDestination(MqttClient client, String topic, Class<? extends T> type) {
this.client = client;
this.topic = topic;
+ if (! type.equals(byte[].class)) {
+ throw new IllegalArgumentException("Curently only byte[] is supported");
+ }
}
@Override
@@ -23,9 +26,9 @@
}
@Override
- public void onNext(byte[] payload) {
+ public void onNext(T payload) {
try {
- MqttMessage message = new MqttMessage(payload);
+ MqttMessage message = new MqttMessage(convertTo(payload));
this.client.publish(topic, message);
} catch (Exception e) {
throw new RuntimeException(e);
@@ -34,6 +37,10 @@
}
}
+ private byte[] convertTo(T payload) {
+ return (byte[])payload;
+ }
+
@Override
public void onError(Throwable t) {
System.out.println("onerr");
diff --git a/rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java b/rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java
index 5b81963..1ac3e74 100644
--- a/rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java
+++ b/rcomp-mqtt/src/main/java/component/mqtt/MqttSource.java
@@ -10,25 +10,28 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-public class MqttSource implements Publisher<byte[]> {
+public class MqttSource<T> implements Publisher<T> {
private MqttClient client;
private String topic;
- public MqttSource(MqttClient client, String topic) {
+ public MqttSource(MqttClient client, String topic, Class<? extends T> type) {
this.client = client;
this.topic = topic;
+ if (! type.equals(byte[].class)) {
+ throw new IllegalArgumentException("Curently only byte[] is supported");
+ }
}
@Override
- public void subscribe(Subscriber<? super byte[]> subscriber) {
+ public void subscribe(Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new MqttSubscription(subscriber));
}
public class MqttSubscription implements IMqttMessageListener, Subscription {
private AtomicBoolean subScribed;
- private Subscriber<? super byte[]> subscriber;
+ private Subscriber<? super T> subscriber;
- public MqttSubscription(Subscriber<? super byte[]> subscriber) {
+ public MqttSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
this.subScribed = new AtomicBoolean(false);
}
@@ -57,7 +60,17 @@
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
- this.subscriber.onNext(message.getPayload());
+ this.subscriber.onNext(converTo(message.getPayload()));
+ }
+
+ /**
+ * Simple conversion that just supports byte[]
+ * @param payload
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ private T converTo(byte[] payload) {
+ return (T) payload;
}
}
diff --git a/rcomp-mqtt/src/test/java/component/mqtt/Test1.java b/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
index 32a1fe0..2674c3f 100644
--- a/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
+++ b/rcomp-mqtt/src/test/java/component/mqtt/Test1.java
@@ -23,8 +23,8 @@
client.connect();
MqttComponent mqtt = new MqttComponent();
mqtt.client = client;
- Publisher<byte[]> fromTopic = mqtt.from("input");
- Subscriber<byte[]> toTopic = mqtt.to("output");
+ Publisher<byte[]> fromTopic = mqtt.from("input", byte[].class);
+ Subscriber<byte[]> toTopic = mqtt.to("output", byte[].class);
Flux.from(fromTopic)
.log()
.subscribe(toTopic);