SLING-9385 - Introduce properties
diff --git a/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java b/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java
index 29c1a57..948489d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java
@@ -18,6 +18,8 @@
  */
 package org.apache.sling.distribution.journal;
 
+import java.util.Map;
+
 public interface MessageInfo {
 
     String getTopic();
@@ -28,7 +30,5 @@
 
     long getCreateTime();
     
-    String getOrg();
-    
-    String getSource();
+    Map<String, String> getProps();
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/MessageSender.java b/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
index 83a9eaa..0a7f668 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
@@ -18,10 +18,14 @@
  */
 package org.apache.sling.distribution.journal;
 
+import java.util.Map;
+
 import com.google.protobuf.GeneratedMessage;
 
 public interface MessageSender<T extends GeneratedMessage> {
 
-	void send(String topic, T payload) throws MessagingException;
+    void send(String topic, T payload) throws MessagingException;
+    
+    void send(String topic, T payload, Map<String, String> properties) throws MessagingException;
 
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java b/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
index c73dfb1..c4d49f9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
@@ -19,27 +19,38 @@
 package org.apache.sling.distribution.journal;
 
 import java.io.Closeable;
+import java.util.function.Consumer;
 
 import com.google.protobuf.GeneratedMessage;
 
 public interface MessagingProvider {
 
-	<T extends GeneratedMessage> MessageSender<T> createSender();
+    <T extends GeneratedMessage> MessageSender<T> createSender();
+    
+    /**
+     * Create a sender for a fixed topic
+     * 
+     * @param <T>
+     * @param topic
+     * @return
+     */
+    default <T extends GeneratedMessage> Consumer<T> createSender(String topic) {
+        MessageSender<GeneratedMessage> sender = createSender();
+        return payload -> sender.send(topic, payload);
+    }
 
-	<T> Closeable createPoller(String topicName, Reset reset, HandlerAdapter<?>... adapters);
+    <T> Closeable createPoller(String topicName, Reset reset, HandlerAdapter<?>... adapters);
 
-	Closeable createPoller(String topicName, Reset reset, String assign,
-						   HandlerAdapter<?>... adapters);
+    Closeable createPoller(String topicName, Reset reset, String assign, HandlerAdapter<?>... adapters);
 
-	<T> JsonMessageSender<T> createJsonSender();
+    <T> JsonMessageSender<T> createJsonSender();
 
-	<T> Closeable createJsonPoller(String topicName, Reset reset, MessageHandler<T> handler,
-								   Class<T> type);
+    <T> Closeable createJsonPoller(String topicName, Reset reset, MessageHandler<T> handler, Class<T> type);
 
-	void assertTopic(String topic) throws MessagingException;
+    void assertTopic(String topic) throws MessagingException;
 
-	long retrieveOffset(String topicName, Reset reset);
+    long retrieveOffset(String topicName, Reset reset);
 
-	String assignTo(long offset);
+    String assignTo(long offset);
 
 }
\ No newline at end of file