SLING-9385 - Introduce properties
diff --git a/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java b/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java
index 29c1a57..948489d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java
@@ -18,6 +18,8 @@
*/
package org.apache.sling.distribution.journal;
+import java.util.Map;
+
public interface MessageInfo {
String getTopic();
@@ -28,7 +30,5 @@
long getCreateTime();
- String getOrg();
-
- String getSource();
+ Map<String, String> getProps();
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/MessageSender.java b/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
index 83a9eaa..0a7f668 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
@@ -18,10 +18,14 @@
*/
package org.apache.sling.distribution.journal;
+import java.util.Map;
+
import com.google.protobuf.GeneratedMessage;
public interface MessageSender<T extends GeneratedMessage> {
- void send(String topic, T payload) throws MessagingException;
+ void send(String topic, T payload) throws MessagingException;
+
+ void send(String topic, T payload, Map<String, String> properties) throws MessagingException;
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java b/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
index c73dfb1..c4d49f9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
@@ -19,27 +19,38 @@
package org.apache.sling.distribution.journal;
import java.io.Closeable;
+import java.util.function.Consumer;
import com.google.protobuf.GeneratedMessage;
public interface MessagingProvider {
- <T extends GeneratedMessage> MessageSender<T> createSender();
+ <T extends GeneratedMessage> MessageSender<T> createSender();
+
+ /**
+ * Create a sender for a fixed topic
+ *
+ * @param <T>
+ * @param topic
+ * @return
+ */
+ default <T extends GeneratedMessage> Consumer<T> createSender(String topic) {
+ MessageSender<GeneratedMessage> sender = createSender();
+ return payload -> sender.send(topic, payload);
+ }
- <T> Closeable createPoller(String topicName, Reset reset, HandlerAdapter<?>... adapters);
+ <T> Closeable createPoller(String topicName, Reset reset, HandlerAdapter<?>... adapters);
- Closeable createPoller(String topicName, Reset reset, String assign,
- HandlerAdapter<?>... adapters);
+ Closeable createPoller(String topicName, Reset reset, String assign, HandlerAdapter<?>... adapters);
- <T> JsonMessageSender<T> createJsonSender();
+ <T> JsonMessageSender<T> createJsonSender();
- <T> Closeable createJsonPoller(String topicName, Reset reset, MessageHandler<T> handler,
- Class<T> type);
+ <T> Closeable createJsonPoller(String topicName, Reset reset, MessageHandler<T> handler, Class<T> type);
- void assertTopic(String topic) throws MessagingException;
+ void assertTopic(String topic) throws MessagingException;
- long retrieveOffset(String topicName, Reset reset);
+ long retrieveOffset(String topicName, Reset reset);
- String assignTo(long offset);
+ String assignTo(long offset);
}
\ No newline at end of file