Merge pull request #20 from cschneider/master

Build badge
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..bf018d2
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,4 @@
+language: java
+jdk:
+  - oraclejdk8
+  
\ No newline at end of file
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
index 13419ee..44b77df 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
@@ -17,13 +17,56 @@
  */
 package org.apache.aries.events.api;
 
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
 
 /**
  * TODO If we allow wild card consumption then a message also needs a topic
  */
-public interface Message {
-    byte[] getPayload();
+public final class Message {
+
+    private final byte[] payload;
+    private final Map<String, String> properties;
+
+    public Message(byte[] payload, Map<String, String> properties) {
+        requireNonNull(payload);
+        requireNonNull(properties);
+        this.payload = payload.clone();
+        this.properties = unmodifiableMap(new HashMap<>(properties));
+    }
+
+    public byte[] getPayload() {
+        return payload.clone();
+    }
     
-    Map<String, String> getProperties();
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public String toString() {
+        return "Message" + properties;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Message message = (Message) o;
+        return Arrays.equals(payload, message.payload) &&
+                properties.equals(message.properties);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(properties);
+        result = 31 * result + Arrays.hashCode(payload);
+        return result;
+    }
+
 }
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
index 36433d9..a88954c 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
@@ -17,9 +17,6 @@
  */
 package org.apache.aries.events.api;
 
-import java.util.Map;
-import java.util.function.Consumer;
-
 /**
  * Journaled messaging API
  */
@@ -27,27 +24,21 @@
     /**
      * Send a message to a topic. When this method returns the message 
      * is safely persisted.
+     *
+     * Messages can be consumed by subscribing to the topic via the #subscribe method.
+     *
+     * Two messages sent sequentially to the same topic by the same
+     * thread, are guaranteed to be consumed in the same order by all subscribers.
      */
-    Position send(String topic, Message message);
+    void send(String topic, Message message);
 
     /**
-     * Subscribe to a topic. The callback is called for each message received.
-     * 
-     * @param topic to consume from. TODO Do we allow wild cards? 
-     * @param position in the topic to start consuming from 
-     * @param seek where to start from when position is not valid or null
-     * @param callback will be called for each message received
-     * @return Returned subscription must be closed by the caller to unsubscribe
+     * Subscribe to a topic.
+     * The returned subscription must be closed by the caller to unsubscribe.
+     *
+     * @param request to subscribe
      */
-    Subscription subscribe(String topic, Position position, Seek seek, Consumer<Received> callback);
-
-    /**
-     * Create a message with payload and metadata
-     * @param payload
-     * @param props
-     * @return
-     */
-    Message newMessage(byte[] payload, Map<String, String> props);
+    Subscription subscribe(SubscribeRequestBuilder request);
 
     /**
      * Deserialize the position from the string
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
index c6ed99f..e211638 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
@@ -21,8 +21,15 @@
  * Position in a the topic.
  * E.g. For a kafka implementation this would be a list of (partition, offset) as we do not support partitions 
  * this could simply be like an offset.
- * TODO How do we provide ordering without being too specific?
+ *
+ * The {@code Position} positions are ordered. The relative order between
+ * two positions can be computed by invoking {@code Comparable#compareTo}.
+ * Comparing this position with a specified position will return a negative
+ * integer, zero, or a positive integer as this position happened before,
+ * happened concurrently, or happened after the specified position.
  */
-public interface Position {
-    long getOffset();
+public interface Position extends Comparable<Position> {
+
+    String positionToString();
+
 }
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Seek.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Seek.java
index e47853b..3dd8757 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Seek.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Seek.java
@@ -21,5 +21,14 @@
  * Starting position when no Position is available 
  */
 public enum Seek {
-    earliest, latest;
+
+    /**
+     * Seek to the first position (happened the earliest) on a topic.
+     */
+    earliest,
+
+    /**
+     * Seek to the last position (happened the latest) on a topic.
+     */
+    latest;
 }
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
new file mode 100644
index 0000000..71cf838
--- /dev/null
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.aries.events.api;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.function.Consumer;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+@ParametersAreNonnullByDefault
+public final class SubscribeRequestBuilder {
+    
+    private SubscribeRequest subscribeRequest;
+
+    private SubscribeRequestBuilder(SubscribeRequest subscribeRequest) {
+        this.subscribeRequest = subscribeRequest;
+    }
+
+    /**
+     * Build a subscription request for the given topic and {@code Consumer} callback.
+     *
+     * @param topic to consume from
+     * @param callback to be invoked for each message consumed
+     * @return a new subscription request
+     */
+    public static SubscribeRequestBuilder to(String topic, Consumer<Received> callback) {
+        return new SubscribeRequestBuilder(new SubscribeRequest(topic, callback));
+    }
+    
+    /**
+     * Set the {@code Position} position to start consuming from.
+     *
+     * @param position in the topic to start consuming from
+     * @return the updated subscribe request
+     */
+    public SubscribeRequestBuilder startAt(Position position) {
+        this.subscribeRequest.position = position;
+        return this;
+    }
+    
+    /**
+     * Set the earliest or latest position to start consuming from
+     * when the position is {@code null} or not valid. By default,
+     * seek is set to {@link Seek#latest}.
+     *
+     * @param seek where to start consuming when no valid position is specified
+     * @return the updated subscribe request
+     */
+    public SubscribeRequestBuilder seek(Seek seek) {
+        this.subscribeRequest.seek = requireNonNull(seek, "Seek must not be null");
+        return this;
+    }
+
+    
+    public SubscribeRequest build() {
+        return subscribeRequest;
+    }
+
+    public static class SubscribeRequest {
+        private final String topic;
+        private final Consumer<Received> callback;
+        private Position position;
+        private Seek seek = Seek.latest;
+        
+        private SubscribeRequest(String topic, Consumer<Received> callback) {
+            this.topic = topic;
+            this.callback = callback;
+        }
+        
+        public String getTopic() {
+            return topic;
+        }
+        
+        public Position getPosition() {
+            return position;
+        }
+        
+        public Seek getSeek() {
+            return seek;
+        }
+        
+        public Consumer<Received> getCallback() {
+            return callback;
+        }
+    }
+}
diff --git a/org.apache.aries.events.kafka/pom.xml b/org.apache.aries.events.kafka/pom.xml
new file mode 100644
index 0000000..bf85186
--- /dev/null
+++ b/org.apache.aries.events.kafka/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.aries.events</groupId>
+    <artifactId>org.apache.aries.events</artifactId>
+    <version>0.1.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>org.apache.aries.events.kafka</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.aries.events</groupId>
+      <artifactId>org.apache.aries.events.api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <!-- Apache Kafka -->
+    <dependency>
+      <groupId>org.apache.servicemix.bundles</groupId>
+      <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+      <version>2.1.0_1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.12</artifactId>
+      <version>2.1.0</version>
+    </dependency>
+  </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaEndpoint.java b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaEndpoint.java
new file mode 100644
index 0000000..a638815
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaEndpoint.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(name = "Apache Aries Events - Apache Kafka endpoint",
+        description = "Apache Kafka endpoint")
+public @interface KafkaEndpoint {
+
+    @AttributeDefinition(name = "Kafka Bootstrap Servers",
+            description = "A comma separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster.")
+    String kafkaBootstrapServers() default "localhost:9092";
+
+
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java
new file mode 100644
index 0000000..7f97d3e
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequestBuilder;
+import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
+import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.Type;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.metatype.annotations.Designate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.Integer.parseInt;
+import static java.lang.Long.parseLong;
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.singleton;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.stream.StreamSupport.stream;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+@Type("kafka")
+@Component(service = Messaging.class, configurationPolicy = ConfigurationPolicy.REQUIRE)
+@Designate(ocd = KafkaEndpoint.class)
+public class KafkaMessaging implements Messaging {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessaging.class);
+
+    /**
+     * The partition to send and receive records.
+     */
+    private static final int PARTITION = 0;
+
+    /**
+     * Shared Kafka producer instance ({@code KafkaProducer}s are thread-safe).
+     */
+    private KafkaProducer<String, byte[]> producer;
+
+    private Map<String, Object> producerConfig;
+
+    private KafkaEndpoint endPoint;
+
+    @Activate
+    public void activate(KafkaEndpoint endPoint) {
+        this.endPoint = endPoint;
+        producerConfig = new HashMap<>();
+        producerConfig.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, endPoint.kafkaBootstrapServers());
+        // We favour durability over throughput
+        // and thus requires full acknowledgment
+        // from replica leader and followers.
+        producerConfig.put(ACKS_CONFIG, "all");
+        producerConfig = unmodifiableMap(producerConfig);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        closeQuietly(producer);
+    }
+
+    @Override
+    public void send(String topic, Message message) {
+        ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(topic, PARTITION, null, message.getPayload(), toHeaders(message.getProperties()));
+        try {
+            RecordMetadata metadata = kafkaProducer().send(record).get();
+            LOG.info(format("Sent to %s", metadata));
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(format("Failed to send mesage on topic %s", topic), e);
+        }
+    }
+
+    @Override
+    public Subscription subscribe(SubscribeRequestBuilder requestBuilder) {
+        SubscribeRequest request = requestBuilder.build();
+        KafkaConsumer<String, byte[]> consumer = buildKafkaConsumer(request.getSeek());
+
+        TopicPartition topicPartition = new TopicPartition(request.getTopic(), PARTITION);
+
+        Collection<TopicPartition> topicPartitions = singleton(topicPartition);
+        consumer.assign(topicPartitions);
+
+        if (request.getPosition() != null) {
+            consumer.seek(topicPartition, asKafkaPosition(request.getPosition()).getOffset());
+        } else if (request.getSeek() == Seek.earliest) {
+            consumer.seekToBeginning(topicPartitions);
+        } else {
+            consumer.seekToEnd(topicPartitions);
+        }
+
+        KafkaSubscription subscription = new KafkaSubscription(consumer, request.getCallback());
+        // TODO pool the threads
+        Thread thread = new Thread(subscription);
+        thread.setDaemon(true);
+        thread.start();
+        return subscription;
+    }
+
+    @Override
+    public Position positionFromString(String position) {
+        String[] chunks = position.split(":");
+        if (chunks.length != 2) {
+            throw new IllegalArgumentException(format("Illegal position format %s", position));
+        }
+        return new KafkaPosition(parseInt(chunks[0]), parseLong(chunks[1]));
+    }
+
+    static String positionToString(Position position) {
+        KafkaPosition kafkaPosition = asKafkaPosition(position);
+        return format("%s:%s", kafkaPosition.getPartition(), kafkaPosition.getOffset());
+    }
+
+    static Iterable<Header> toHeaders(Map<String, String> properties) {
+        return properties.entrySet().stream()
+                .map(KafkaMessaging::toHeader)
+                .collect(Collectors.toList());
+    }
+
+    static Map<String, String> toProperties(Headers headers) {
+        return stream(headers.spliterator(), true)
+                .collect(Collectors.toMap(Header::key, header -> new String(header.value(), UTF_8)));
+    }
+
+    static RecordHeader toHeader(Map.Entry<String, String> property) {
+        return new RecordHeader(property.getKey(), property.getValue().getBytes(UTF_8));
+    }
+
+    static Message toMessage(ConsumerRecord<String, byte[]> record) {
+        return new Message(record.value(), toProperties(record.headers()));
+    }
+
+
+    private synchronized KafkaProducer<String, byte[]> kafkaProducer() {
+        if (producer == null) {
+            producer = new KafkaProducer<>(producerConfig);
+        }
+        return producer;
+    }
+
+    private KafkaConsumer<String, byte[]> buildKafkaConsumer(Seek seek) {
+
+        String groupId = UUID.randomUUID().toString();
+
+        Map<String, Object> consumerConfig = new HashMap<>();
+        consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, endPoint.kafkaBootstrapServers());
+        consumerConfig.put(GROUP_ID_CONFIG, groupId);
+        consumerConfig.put(ENABLE_AUTO_COMMIT_CONFIG, false);
+        consumerConfig.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        consumerConfig.put(AUTO_OFFSET_RESET_CONFIG, seek.name());
+
+        return new KafkaConsumer<>(unmodifiableMap(consumerConfig));
+    }
+
+
+    private void closeQuietly(Closeable closeable) {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (IOException ignore) {
+                // ignore
+            }
+        }
+    }
+
+    private static KafkaPosition asKafkaPosition(Position position) {
+        if (! KafkaPosition.class.isInstance(position)) {
+            throw new IllegalArgumentException(format("Position %s must be and instance of %s", position, KafkaPosition.class.getCanonicalName()));
+        }
+        return (KafkaPosition) position;
+    }
+
+
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaPosition.java b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaPosition.java
new file mode 100644
index 0000000..7e36216
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaPosition.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import javax.annotation.Nonnull;
+
+import org.apache.aries.events.api.Position;
+
+public final class KafkaPosition implements Position {
+
+    private final int partition;
+
+    private final long offset;
+
+    public KafkaPosition(int partition, long offset) {
+        this.partition = partition;
+        this.offset = offset;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    @Override
+    public String toString() {
+        return positionToString();
+    }
+
+    @Override
+    public String positionToString() {
+        return KafkaMessaging.positionToString(this);
+    }
+
+    @Override
+    public int compareTo(@Nonnull Position p) {
+        return Long.compare(offset, ((KafkaPosition)p).offset);
+    }
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java
new file mode 100644
index 0000000..eaf94c6
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.util.function.Consumer;
+
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Subscription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
+import static java.time.Duration.ofHours;
+import static java.util.Objects.requireNonNull;
+import static org.apache.aries.events.kafka.KafkaMessaging.toMessage;
+
+public class KafkaSubscription implements Subscription, Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSubscription.class);
+
+    private volatile boolean running = true;
+
+    private final KafkaConsumer<String, byte[]> consumer;
+
+    private final Consumer<Received> callback;
+
+    public KafkaSubscription(KafkaConsumer<String, byte[]> consumer, Consumer<Received> callback) {
+        this.consumer = requireNonNull(consumer);
+        this.callback = requireNonNull(callback);
+    }
+
+    @Override
+    public void run() {
+        try {
+            for (;running;) {
+                ConsumerRecords<String, byte[]> records = consumer.poll(ofHours(1));
+                records.forEach(record -> callback.accept(toReceived(record)));
+            }
+        } catch (WakeupException e) {
+            if (running) {
+                LOG.error("WakeupException while running {}", e.getMessage(), e);
+                throw e;
+            } else {
+                LOG.debug("WakeupException while stopping {}", e.getMessage(), e);
+            }
+        } catch(Throwable t) {
+            LOG.error(format("Catch Throwable %s closing subscription", t.getMessage()), t);
+            throw t;
+        } finally {
+            // Close the network connections and sockets
+            consumer.close();
+        }
+    }
+
+    @Override
+    public void close() {
+        running = false;
+        consumer.wakeup();
+    }
+
+    private Received toReceived(ConsumerRecord<String, byte[]> record) {
+        Position position = new KafkaPosition(record.partition(), record.offset());
+        return new Received(position, toMessage(record));
+    }
+
+}
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaMessagingTest.java b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaMessagingTest.java
new file mode 100644
index 0000000..b37951b
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaMessagingTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.SubscribeRequestBuilder;
+import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.kafka.setup.KafkaBaseTest;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static java.nio.charset.Charset.forName;
+import static java.util.Collections.singletonMap;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+public class KafkaMessagingTest extends KafkaBaseTest {
+
+    @Test
+    public void testPositionFromString() throws Exception {
+        Messaging messaging = new KafkaMessaging();
+        KafkaPosition kafkaPosition = (KafkaPosition) messaging.positionFromString("0:100");
+        assertEquals(0, kafkaPosition.getPartition());
+        assertEquals(100, kafkaPosition.getOffset());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPositionFromStringIllegalArgument() throws Exception {
+        Messaging messaging = new KafkaMessaging();
+        messaging.positionFromString("0:100:23");
+    }
+
+    @Test(timeout = 10000)
+    public void testSendAndReceive() throws Exception {
+
+        String topic = "test_send_and_receive";
+        createTopic(topic, 1);
+
+        KafkaEndpoint kafkaEndpoint = Mockito.mock(KafkaEndpoint.class);
+        when(kafkaEndpoint.kafkaBootstrapServers())
+                .thenReturn(getKafkaLocal().getKafkaBootstrapServer());
+        KafkaMessaging messaging = new KafkaMessaging();
+        messaging.activate(kafkaEndpoint);
+
+        byte[] payload = "test".getBytes(forName("UTF-8"));
+
+        Message message = new Message(payload, singletonMap("prop1", "value1"));
+        messaging.send(topic, message);
+
+        Semaphore invoked = new Semaphore(0);
+
+        SubscribeRequestBuilder requestBuilder = SubscribeRequestBuilder
+                .to(topic, (received) -> invoked.release())
+                .startAt(new KafkaPosition(0, 0));
+
+        try (Subscription subscription = messaging.subscribe(requestBuilder)) {
+            invoked.tryAcquire(10, TimeUnit.SECONDS);
+        }
+
+        messaging.deactivate();
+    }
+
+}
\ No newline at end of file
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaPositionTest.java b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaPositionTest.java
new file mode 100644
index 0000000..3acd699
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaPositionTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.aries.events.api.Position;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KafkaPositionTest {
+
+    private static final Random RAND = new Random();
+
+    @Test
+    public void testGetPartition() throws Exception {
+        assertEquals(10, new KafkaPosition(10, 1000).getPartition());
+    }
+
+    @Test
+    public void testGetOffset() throws Exception {
+        assertEquals(1000, new KafkaPosition(10, 1000).getOffset());
+    }
+
+    @Test
+    public void testPositionToString() throws Exception {
+        assertEquals("10:1000", new KafkaPosition(10, 1000).positionToString());
+    }
+
+    @Test
+    public void testCompareTo() throws Exception {
+        assertEquals(0, comparePositions(position(RAND.nextInt(), 5), position(RAND.nextInt(), 5)));
+        assertEquals(1, comparePositions(position(RAND.nextInt(), 10), position(RAND.nextInt(), 5)));
+        assertEquals(-1, comparePositions(position(RAND.nextInt(), 2), position(RAND.nextInt(), 5)));
+    }
+
+    @Test
+    public void testOrder() {
+        NavigableMap<Position, String> positions = new TreeMap<>();
+        positions.put(new KafkaPosition(0, 0), "earliest");
+        positions.put(new KafkaPosition(0, 1), "mid");
+        positions.put(new KafkaPosition(0, 2), "latest");
+        assertEquals("earliest", positions.firstEntry().getValue());
+        assertEquals("latest", positions.lastEntry().getValue());
+    }
+
+    private int comparePositions(KafkaPosition position1, KafkaPosition position2) {
+        return position1.compareTo(position2);
+    }
+
+    private KafkaPosition position(int partition, long offset) {
+        return new KafkaPosition(partition, offset);
+    }
+}
\ No newline at end of file
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaBaseTest.java b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaBaseTest.java
new file mode 100644
index 0000000..61763b6
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaBaseTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka.setup;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.createTempDirectory;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.aries.events.kafka.setup.KafkaLocal.getKafkaProperties;
+import static org.apache.kafka.clients.admin.AdminClient.create;
+import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
+
+public class KafkaBaseTest {
+
+    private static KafkaLocal kafkaLocal;
+
+    private static ZooKeeperLocal zooKeeperLocal;
+
+    private static Logger LOG = LoggerFactory.getLogger(KafkaBaseTest.class);
+
+    @BeforeClass
+    public static void startKafka() throws IOException {
+
+        int zkPort = randomAvailablePort();
+        String zkDir = createTempDirectory("zk").toString();
+        String zkConnect = format("127.0.0.1:%s", zkPort);
+        zooKeeperLocal = new ZooKeeperLocal(ZooKeeperLocal.getZooKeeperProperties(zkDir, zkPort));
+        LOG.info(format("Started local ZooKeeper server on port %s and dataDirectory %s", zkPort, zkDir));
+
+        int kafkaPort = randomAvailablePort();
+        String kafkaLogDir = createTempDirectory("kafka").toString();
+        kafkaLocal = new KafkaLocal(getKafkaProperties(kafkaLogDir, kafkaPort, zkConnect));
+        LOG.info(format("Started local Kafka on port %s and logDirectory %s", zkConnect, kafkaLogDir));
+    }
+
+    @AfterClass
+    public static void shutdownKafka() {
+        if (kafkaLocal != null) {
+            kafkaLocal.stop();
+        }
+        if (zooKeeperLocal != null) {
+            zooKeeperLocal.stop();
+        }
+    }
+
+    public static KafkaLocal getKafkaLocal() {
+        return kafkaLocal;
+    }
+
+
+    public Set<String> listTopics() {
+        try (AdminClient admin = buildAdminClient()) {
+            ListTopicsResult result = admin.listTopics();
+            return result.names().get();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list topics", e);
+        }
+    }
+
+    public void createTopic(String topicName, int numPartitions) {
+        NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
+        try (AdminClient admin = buildAdminClient()) {
+            CreateTopicsResult result = admin.createTopics(singletonList(newTopic));
+            result.values().get(topicName).get();
+            LOG.info(format("created topic %s", topicName));
+        } catch (Exception e) {
+            throw new RuntimeException(format("Failed to create topic %s", topicName), e);
+        }
+    }
+
+    public void deleteTopic(String topicName) {
+        try (AdminClient admin = buildAdminClient()) {
+            DeleteTopicsResult result = admin.deleteTopics(Collections.singleton(topicName));
+            result.all().get();
+            LOG.info(format("deleted topic %s", topicName));
+        } catch (Exception e) {
+            throw new RuntimeException(format("Failed to delete topic %s", topicName), e);
+        }
+    }
+
+    private static int randomAvailablePort() {
+        try (ServerSocket ss = new ServerSocket(0)) {
+            return ss.getLocalPort();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private AdminClient buildAdminClient() {
+        return create(singletonMap(BOOTSTRAP_SERVERS_CONFIG, kafkaLocal.getKafkaBootstrapServer()));
+    }
+
+}
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaLocal.java b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaLocal.java
new file mode 100644
index 0000000..e55e136
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaLocal.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka.setup;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
+import static java.lang.String.format;
+
+public class KafkaLocal {
+
+    private final KafkaServerStartable server;
+
+    private final String kafkaBootstrapServer;
+
+    public KafkaLocal(Map<String, Object> kafkaProperties) {
+        KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+        kafkaBootstrapServer = format("%s:%s", kafkaConfig.hostName(), kafkaConfig.port());
+        server = new KafkaServerStartable(kafkaConfig);
+        server.startup();
+    }
+
+    public void stop() {
+        server.shutdown();
+    }
+
+    public String getKafkaBootstrapServer() {
+        return kafkaBootstrapServer;
+    }
+
+    public static Map<String, Object> getKafkaProperties(String logDir, int port, String zkConnect) {
+        Map<String, Object> props = new HashMap<>();
+        props.put("host.name", "localhost");
+        props.put("log.dir", logDir);
+        props.put("port", port);
+        props.put("zookeeper.connect", zkConnect);
+        return props;
+    }
+}
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/ZooKeeperLocal.java b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/ZooKeeperLocal.java
new file mode 100644
index 0000000..ea479ff
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/ZooKeeperLocal.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka.setup;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Properties;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+public class ZooKeeperLocal {
+
+    private final ZooKeeperServerMain server;
+
+    public ZooKeeperLocal(Properties zkProperties) {
+        QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
+        try {
+            quorumPeerConfig.parseProperties(zkProperties);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        ServerConfig serverConfig = new ServerConfig();
+        serverConfig.readFrom(quorumPeerConfig);
+
+        server = new ZooKeeperServerMain();
+
+        Thread dt = new Thread(runnable(serverConfig));
+        dt.setDaemon(true);
+        dt.start();
+    }
+
+    public void stop() {
+        try {
+            Method shutdown = server.getClass().getDeclaredMethod("shutdown");
+            shutdown.setAccessible(true);
+            shutdown.invoke(server);
+        } catch (Exception e) {
+            throw new RuntimeException();
+        }
+    }
+
+    public static Properties getZooKeeperProperties(String dataDirectory, int port) {
+        Properties props = new Properties();
+        props.put("dataDir", dataDirectory);
+        props.put("clientPort", port);
+        return props;
+    }
+
+    private Runnable runnable(ServerConfig serverConfig) {
+        return () -> {
+            try {
+                server.runFromConfig(serverConfig);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
index b888272..674864a 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
@@ -19,13 +19,12 @@
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
 
 import org.apache.aries.events.api.Message;
 import org.apache.aries.events.api.Messaging;
 import org.apache.aries.events.api.Position;
-import org.apache.aries.events.api.Received;
-import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequestBuilder;
+import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
 import org.apache.aries.events.api.Subscription;
 import org.apache.aries.events.api.Type;
 import org.osgi.service.component.annotations.Component;
@@ -33,33 +32,39 @@
 @Component
 @Type("memory")
 public class InMemoryMessaging implements Messaging {
-    private Map<String, Topic> topics = new ConcurrentHashMap<>();
+    private final Map<String, Topic> topics = new ConcurrentHashMap<>();
+    private final int keepAtLeast;
+    
+    public InMemoryMessaging() {
+        this(10000);
+    }
 
-    @Override
-    public Position send(String topicName, Message message) {
-        Topic topic = getOrCreate(topicName);
-        return topic.send(message);
+    public InMemoryMessaging(int keepAtLeast) {
+        this.keepAtLeast = keepAtLeast;
+        
     }
 
     @Override
-    public Subscription subscribe(String topicName, Position position, Seek seek, Consumer<Received> callback) {
+    public void send(String topicName, Message message) {
         Topic topic = getOrCreate(topicName);
-        return topic.subscribe(position, seek, callback);
+        topic.send(message);
     }
 
     @Override
-    public Message newMessage(byte[] payload, Map<String, String> props) {
-        return new MemoryMessage(payload, props);
+    public Subscription subscribe(SubscribeRequestBuilder requestBuilder) {
+        SubscribeRequest request = requestBuilder.build();
+        Topic topic = getOrCreate(request.getTopic());
+        return topic.subscribe(request);
     }
 
     @Override
     public Position positionFromString(String position) {
-        long offset = new Long(position).longValue();
+        long offset = Long.parseLong(position);
         return new MemoryPosition(offset);
     }
 
     private Topic getOrCreate(String topicName) {
-        return topics.computeIfAbsent(topicName, topicName2 -> new Topic(topicName2));
+        return topics.computeIfAbsent(topicName, topicName2 -> new Topic(topicName2, keepAtLeast));
     }
 
 }
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
index 4fb5d0a..15c9585 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
@@ -17,22 +17,40 @@
  */
 package org.apache.aries.events.memory;
 
+import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class Journal<T> {
-    private AtomicLong nextOffset = new AtomicLong();
-    private ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
+class Journal<T> {
+    private final int keepAtLeast;
+    private final AtomicLong nextOffset = new AtomicLong();
+    private final ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
+    private final AtomicLong count = new AtomicLong();
+    
+    public Journal(int keepAtLeast) {
+        this.keepAtLeast = keepAtLeast;
+    }
     
     public long append(T message) {
+        if (count.incrementAndGet() > keepAtLeast * 2) {
+            evict();
+        }
         Long offset = nextOffset.getAndIncrement();
         messages.put(offset, message);
         return offset;
     }
 
+    private synchronized void evict() {
+        Iterator<Long> it = messages.keySet().iterator();
+        for (int c = 0; c < keepAtLeast; c++) {
+            messages.remove(it.next());
+        }
+        count.set(0);
+    }
+
     public long getFirstOffset() {
         try {
             return messages.firstKey();
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
index 694c42d..fab1bea 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
@@ -27,12 +27,17 @@
         this.offset = offset;
     }
 
-    public long getOffset() {
+    long getOffset() {
         return offset;
     }
 
     @Override
-    public String toString() {
-        return new Long(offset).toString();
+    public String positionToString() {
+        return Long.toString(offset);
+    }
+
+    @Override
+    public int compareTo(Position p) {
+        return Long.compare(offset, ((MemoryPosition)p).offset);
     }
 }
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
index 02bea04..91a6d71 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -17,106 +17,107 @@
  */
 package org.apache.aries.events.memory;
 
-import java.util.HashSet;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import org.apache.aries.events.api.Message;
 import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.Received;
 import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
 import org.apache.aries.events.api.Subscription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Topic {
-    private Logger log = LoggerFactory.getLogger(this.getClass());
+class Topic {
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
 
-    private String topicName;
-    private Journal<Message> journal;
-    private Set<Subscription> subscriptions = new HashSet<>();
+    private final String topicName;
+    private final Journal<Message> journal;
 
-    public Topic(String topicName) {
+    public Topic(String topicName, int keepAtLeast) {
         this.topicName = topicName;
-        this.journal = new Journal<>();
+        this.journal = new Journal<>(keepAtLeast);
     }
 
-    public Position send(Message message) {
+    public synchronized Position send(Message message) {
         long offset = this.journal.append(message);
+        notifyAll();
         return new MemoryPosition(offset);
     }
 
-    public Subscription subscribe(Position position, Seek seek, Consumer<Received> callback) {
-        long startOffset = getStartOffset(position, seek);
+    public Subscription subscribe(SubscribeRequest request) {
+        long startOffset = getStartOffset((MemoryPosition) request.getPosition(), request.getSeek());
         log.debug("Consuming from " + startOffset);
-        return new TopicSubscription(startOffset, callback);
+        return new TopicSubscription(startOffset, request.getCallback());
     }
 
-    private long getStartOffset(Position position, Seek seek) {
+    private long getStartOffset(MemoryPosition position, Seek seek) {
         if (position != null) {
             return position.getOffset();
         } else {
             if (seek == Seek.earliest) {
                 return this.journal.getFirstOffset();
-            } else if (seek == Seek.latest) {
-                return this.journal.getLastOffset() + 1;
             } else {
-                throw new IllegalArgumentException("Seek must not be null");
+                return this.journal.getLastOffset() + 1;
             }
         }
     }
 
+    private synchronized Entry<Long, Message> waitNext(long currentOffset) throws InterruptedException {
+        Entry<Long, Message> entry = journal.getNext(currentOffset);
+        if (entry != null) {
+            return entry;
+        }
+        log.debug("Waiting for next message");
+        wait();
+        return journal.getNext(currentOffset);
+    }
+
     class TopicSubscription implements Subscription {
         private Consumer<Received> callback;
         private ExecutorService executor;
-        private volatile boolean running;
         private long currentOffset;
 
         TopicSubscription(long startOffset, Consumer<Received> callback) {
             this.currentOffset = startOffset;
             this.callback = callback;
-            this.running = true;
             String name = "Poller for " + topicName;
             this.executor = Executors.newSingleThreadExecutor(r -> new Thread(r, name));
             this.executor.execute(this::poll);
         }
-        
+
         private void poll() {
-            while (running) {
-                Entry<Long, Message> entry = journal.getNext(currentOffset);
-                if (entry != null) {
-                    long offset = entry.getKey();
-                    try {
-                        MemoryPosition position = new MemoryPosition(this.currentOffset);
-                        Received received = new Received(position, entry.getValue());
-                        callback.accept(received);
-                    } catch (Exception e) {
-                        log.warn(e.getMessage(), e);
-                    }
-                    this.currentOffset = offset + 1;
-                } else {
-                    try {
-                        Thread.sleep(100);
-                    } catch (InterruptedException e) {
+            try {
+                while (true) {
+                    Entry<Long, Message> entry = waitNext(currentOffset);
+                    if (entry != null) {
+                        handleMessage(entry);
                     }
                 }
+            } catch (InterruptedException e) {
+                log.debug("Poller thread for consumer on topic " + topicName + " stopped.");
             }
         }
 
+        private void handleMessage(Entry<Long, Message> entry) {
+            long offset = entry.getKey();
+            try {
+                MemoryPosition position = new MemoryPosition(this.currentOffset);
+                Received received = new Received(position, entry.getValue());
+                callback.accept(received);
+            } catch (Exception e) {
+                log.warn(e.getMessage(), e);
+            }
+            this.currentOffset = offset + 1;
+        }
+
         @Override
         public void close() {
-            this.running = false;
             executor.shutdown();
-            try {
-                executor.awaitTermination(10, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-                // Ignore
-            }
-            subscriptions.remove(this);
+            executor.shutdownNow();
         }
 
     }
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MemoryPositionTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MemoryPositionTest.java
new file mode 100644
index 0000000..7613d2c
--- /dev/null
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MemoryPositionTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.aries.events.memory;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MemoryPositionTest {
+
+    @Test
+    public void testCompareTo() throws Exception {
+        assertEquals(0, comparePositions(position(5),  position(5)));
+        assertEquals(1, comparePositions(position(10), position(5)));
+        assertEquals(-1, comparePositions(position(2), position(5)));
+    }
+
+    private int comparePositions(MemoryPosition position1, MemoryPosition position2) {
+        return position1.compareTo(position2);
+    }
+
+    private MemoryPosition position(long offset) {
+        return new MemoryPosition(offset);
+    }
+
+}
\ No newline at end of file
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
index a8262bd..5edee5d 100644
--- a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
@@ -1,8 +1,12 @@
 package org.apache.aries.events.memory;
 
+import static org.apache.aries.events.api.SubscribeRequestBuilder.to;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.MockitoAnnotations.initMocks;
@@ -13,6 +17,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -21,6 +26,7 @@
 import org.apache.aries.events.api.Position;
 import org.apache.aries.events.api.Received;
 import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequestBuilder;
 import org.apache.aries.events.api.Subscription;
 import org.junit.After;
 import org.junit.Before;
@@ -28,9 +34,12 @@
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 
 public class MessagingTest {
     
+    private static final long MAX_MANY = 100000l;
+
     @Mock
     private Consumer<Received> callback;
     
@@ -55,62 +64,69 @@
     @Test
     public void testPositionFromString() {
         Position pos = messaging.positionFromString("1");
-        assertThat(pos.getOffset(), equalTo(1l));
+        assertThat(pos.compareTo(new MemoryPosition(1)), equalTo(0));
+        assertThat(pos.positionToString(), equalTo("1"));
     }
     
     @Test
     public void testSend() {
-        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        subscribe(to("test", callback).seek(Seek.earliest));
         String content = "testcontent";
-        Position pos = send("test", content);
-        assertThat(pos.toString(), equalTo("0"));
-        
-        verify(callback, timeout(1000)).accept(messageCaptor.capture());
+        send("test", content);
+        assertMessages(1);
         Received received = messageCaptor.getValue();
         assertThat(received.getMessage().getPayload(), equalTo(toBytes(content)));
-        assertThat(received.getPosition().getOffset(), equalTo(0l));
+        assertEquals(0, received.getPosition().compareTo(new MemoryPosition(0)));
         assertThat(received.getMessage().getProperties().size(), equalTo(1));
         assertThat(received.getMessage().getProperties().get("my"), equalTo("testvalue"));
     }
     
-    @Test(expected=IllegalArgumentException.class)
-    public void testInvalid() {
-        messaging.subscribe("test", null, null, callback);
+    @Test(expected=NullPointerException.class)
+    public void testInvalidSubscribe() {
+        subscribe(to("test", callback).seek(null));
+    }
+    
+    @Test
+    public void testExceptionInHandler() {
+        doThrow(new RuntimeException("Expected exception")).when(callback).accept(Mockito.any(Received.class));
+        subscribe(to("test", callback));
+        send("test", "testcontent");
+        assertMessages(1);
     }
 
     @Test
     public void testEarliestBefore() {
-        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        subscribe(to("test", callback).seek(Seek.earliest));
         send("test", "testcontent");
         send("test", "testcontent2");
-        verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+        assertMessages(2);
         assertThat(messageContents(), contains("testcontent", "testcontent2"));
     }
-    
+
     @Test
     public void testEarliestAfter() {
         send("test", "testcontent");
-        subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+        subscribe(to("test", callback).seek(Seek.earliest));
         send("test", "testcontent2");
-        verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+        assertMessages(2);
         assertThat(messageContents(), contains("testcontent", "testcontent2"));
     }
     
     @Test
     public void testLatestBefore() {
-        subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+        subscribe(to("test", callback));
         send("test", "testcontent");
         send("test", "testcontent2");
-        verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+        assertMessages(2);
         assertThat(messageContents(), contains("testcontent", "testcontent2"));
     }
     
     @Test
     public void testLatest() {
         send("test", "testcontent");
-        subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+        subscribe(to("test", callback));
         send("test", "testcontent2");
-        verify(callback, timeout(1000)).accept(messageCaptor.capture());
+        assertMessages(1);
         assertThat(messageContents(), contains("testcontent2"));
     }
     
@@ -118,10 +134,33 @@
     public void testFrom1() {
         send("test", "testcontent");
         send("test", "testcontent2");
-        subscriptions.add(messaging.subscribe("test", new MemoryPosition(1l), Seek.earliest, callback));
-        verify(callback, timeout(1000)).accept(messageCaptor.capture());
+        subscribe(to("test", callback).startAt(new MemoryPosition(1l)).seek(Seek.earliest));
+        assertMessages(1);
         assertThat(messageContents(), contains("testcontent2"));
     }
+    
+    @Test
+    public void testMany() {
+        AtomicLong count = new AtomicLong();
+        Consumer<Received> manyCallback = rec -> { count.incrementAndGet(); };
+        messaging.subscribe(to("test", manyCallback));
+        for (long c=0; c < MAX_MANY; c++) {
+            send("test", "content " + c);
+            if (c % 10000 == 0) {
+                System.out.println("Sending " + c);
+            }
+            
+        }
+        await().until(count::get, equalTo(MAX_MANY)); 
+    }
+    
+    private void assertMessages(int num) {
+        verify(callback, timeout(1000).times(num)).accept(messageCaptor.capture());
+    }
+
+    private void subscribe(SubscribeRequestBuilder request) {
+        this.subscriptions.add(messaging.subscribe(request));
+    }
 
     private List<String> messageContents() {
         return messageCaptor.getAllValues().stream()
@@ -132,11 +171,11 @@
         return new String(rec.getMessage().getPayload(), Charset.forName("UTF-8"));
     }
     
-    private Position send(String topic, String content) {
+    private void send(String topic, String content) {
         Map<String, String> props = new HashMap<String, String>();
         props.put("my", "testvalue");
-        Message message = messaging.newMessage(toBytes(content), props);
-        return messaging.send(topic, message);
+        Message message = new Message(toBytes(content), props);
+        messaging.send(topic, message);
     }
 
     private byte[] toBytes(String content) {
diff --git a/org.apache.aries.events.mongo/pom.xml b/org.apache.aries.events.mongo/pom.xml
new file mode 100644
index 0000000..34cb08b
--- /dev/null
+++ b/org.apache.aries.events.mongo/pom.xml
@@ -0,0 +1,43 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements. See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership. The SF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License. You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied. See the License for the
+  ~ specific language governing permissions and limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.aries.events</groupId>
+        <artifactId>org.apache.aries.events</artifactId>
+        <version>0.1.0-SNAPSHOT</version>
+    </parent>
+    <groupId>org.apache.aries.events.mongo</groupId>
+    <artifactId>org.apache.aries.events.mongo</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.aries.events</groupId>
+            <artifactId>org.apache.aries.events.api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongo-java-driver</artifactId>
+            <version>3.8.2</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/CachingFactory.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/CachingFactory.java
new file mode 100644
index 0000000..5b7fdc6
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/CachingFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * A factory that keeps previously created instances in a cache so that
+ * they will get reused if requested repeatedly.
+ * Currently there is no cache size limit implemented so this implementation
+ * is only good for the use case with limited parameter space.
+ * @param <K> key type. Serves as cache key as well as an input parameter for the
+ *           factory method. Must provide sensible implementations for
+ *          equals and hashCode methods
+ * @param <V> result type.
+ */
+public final class CachingFactory<K, V extends AutoCloseable> implements Closeable {
+    
+
+    public static <K2, V2 extends AutoCloseable> CachingFactory<K2, V2> cachingFactory(Function<K2, V2> create) {
+        return new CachingFactory<K2, V2>(create);
+    }
+
+    /**
+     * Find or created a value for the specified key
+     * @param arg key instance
+     * @return either an existing (cached) value of newly created one.
+     */
+    public synchronized V get(K arg) {
+        return cache.computeIfAbsent(arg, create);
+    }
+
+    /**
+     * Clears all cached instances properly disposing them.
+     */
+    public synchronized void clear() {
+        cache.values().stream()
+            .forEach(CachingFactory::safeClose);
+        cache.clear();
+    }
+
+    /**
+     * Closing this factory properly disposing all cached instances
+     */
+    @Override
+    public void close() {
+        clear();
+    }
+
+    //*********************************************
+    // Private
+    //*********************************************
+
+    private static final Logger LOG = getLogger(CachingFactory.class);
+    private final Map<K, V> cache = new HashMap<K, V>();
+    private final Function<K, V> create;
+
+    private CachingFactory(Function<K, V> create) {
+        this.create = create;
+    }
+
+    private static void safeClose(AutoCloseable closable) {
+        try {
+            closable.close();
+        } catch (Exception e) {
+            LOG.warn(e.getMessage(), e);
+        }
+    }
+
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/Common.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/Common.java
new file mode 100644
index 0000000..be5bc76
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/Common.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import com.mongodb.client.MongoCollection;
+import org.bson.Document;
+
+import static com.mongodb.client.model.Filters.lte;
+import static com.mongodb.client.model.Indexes.descending;
+import static org.apache.aries.events.mongo.Common.Fields.INDEX;
+
+/**
+ * Common string definitions
+ */
+@SuppressWarnings({"HardCodedStringLiteral", "InterfaceNeverImplemented"})
+interface Common {
+
+    String DEFAULT_DB_NAME = "aem-replication";
+
+    /** MongoDB field names */
+    interface Fields {
+        String INDEX = "i";
+        String TIME_STAMP = "t";
+        String PAYLOAD = "d";
+        String PROPS = "p";
+    }
+
+    /**
+     * Returns the next available index in the collection
+     * @param col collection to check. The collection must contain
+     *            log messages published by a Publisher instance
+     * @return the index that should be assigned to the next message when
+     * it gets published
+     */
+    static long upcomingIndex(MongoCollection<Document> col) {
+        Document doc = col.find(lte(INDEX, Long.MAX_VALUE))
+                          .sort(descending(INDEX))
+                          .first();
+        if (doc != null) {
+            long latestAvailable = doc.getLong(INDEX);
+            return latestAvailable + 1L;
+        } else {
+            return 0L;
+        }
+    }
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiver.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiver.java
new file mode 100644
index 0000000..1a9626e
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiver.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import org.apache.aries.events.api.Message;
+
+public interface MessageReceiver extends AutoCloseable {
+
+    /** returns data entry for the specified offset.
+     * If necessary waits until data is available.
+     * If data entry at the specified offset has
+     * been evicted, throws NoSuchElement exception
+     * @param index an offset to the desired entry
+     * @return requested data entry together with the
+     *         offset for the next data entry
+     */
+    Message receive(long index) throws InterruptedException;
+
+    /** returns the index of the earliest available
+     * data entry. It also causes the receiver to
+     * pre-fetch and cache a batch of earliest available
+     * entries thus giving the user a chance to consume
+     * them and catch up before they get evicted
+     * @return an index of the first available data entry or
+     * 0 if the log is empty
+     */
+    long earliestIndex();
+
+    /** returns the index of the next available data
+     *  entry.
+     *  The returned index points to the entry yet to
+     *  be inserted into the log.
+     *  @return index of the data entry that will be
+     *  inserted next. 0 if the log is empty.
+     */
+    long latestIndex();
+
+    @Override
+    void close();
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiverImpl.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiverImpl.java
new file mode 100644
index 0000000..f03f701
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiverImpl.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+import com.mongodb.Mongo;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.mongo.Common.Fields;
+import org.bson.Document;
+import org.bson.types.Binary;
+import org.slf4j.Logger;
+
+import static java.lang.Math.min;
+import static java.lang.System.currentTimeMillis;
+import static java.lang.Thread.sleep;
+import static java.util.Collections.emptyList;
+import static org.apache.aries.events.mongo.Common.Fields.PAYLOAD;
+import static org.apache.aries.events.mongo.Common.Fields.INDEX;
+import static org.apache.aries.events.mongo.Common.upcomingIndex;
+import static org.slf4j.LoggerFactory.getLogger;
+
+final class MessageReceiverImpl implements MessageReceiver {
+
+    static MessageReceiver messageReceiver(MongoCollection<Document> col) {
+        return new MessageReceiverImpl(col, Optional.empty());
+    }
+
+    @Override
+    public Message receive(long index) throws InterruptedException {
+        fetch(index);
+        long bufferIndex = index - firstIndex;
+        assert bufferIndex < buffer.size() : bufferIndex + ", " + buffer.size();
+        return buffer.get((int) bufferIndex);
+    }
+
+    @Override
+    public long earliestIndex() {
+       refreshBuffer(FIRST_AVAILABLE);
+       return firstIndex;
+    }
+
+    @Override
+    public long latestIndex() {
+        long result = upcomingIndex(col);
+        if (result > 0) {
+            result -= 1;
+        }
+        return result;
+    }
+
+    @Override
+    public void close() {
+        // MongoDB driver doesn't like to be interruped so
+        // we try to get out of the poll loop in a gentle way
+        interrupted = true;
+        mongoClient.ifPresent(Mongo::close);
+    }
+
+    //*********************************************
+    // Internals
+    //*********************************************
+
+    private static final Logger LOGGER = getLogger(MessageReceiverImpl.class);
+    private static final long FINE_GRAINED_DELAY = 100L;
+    private static final long FIRST_AVAILABLE = -1;
+    private final Optional<MongoClient> mongoClient;
+    private final MongoCollection<Document> col;
+    private long maxWaitTime = 1000L;
+    private int fetchLimit = 100;
+    private long lastReceived = currentTimeMillis();
+    private long firstIndex = 0L;
+    private List<Message> buffer = emptyList();
+    private volatile boolean interrupted = false;
+
+    private MessageReceiverImpl(MongoCollection<Document> col, Optional<MongoClient> mongoClient) {
+        LOGGER.debug("Creating new receiver: " + col.getNamespace().getCollectionName());
+        this.mongoClient = mongoClient;
+        this.col = col;
+    }
+
+    private void fetch(long index) throws InterruptedException {
+        while (firstIndex > index || firstIndex + buffer.size() <= index) {
+            long delay = min(maxWaitTime, (currentTimeMillis() - lastReceived) / 2);
+            adaptivePause(delay);
+            refreshBuffer(index);
+        }
+    }
+
+    private void refreshBuffer(long index) {
+        long startIndex = index;
+        try (MongoCursor<Document> cursor = col.find(Filters.gte(INDEX, startIndex)).iterator()) {
+            List<Message> collected = new ArrayList<>(fetchLimit);
+            while (cursor.hasNext()) {
+                int i = collected.size();
+                Document document = cursor.next();
+                long idx = document.get(INDEX, Long.class);
+                if (startIndex == FIRST_AVAILABLE) {
+                    startIndex = idx;
+                }
+                if (idx == startIndex + i) {
+                    Binary payload = document.get(PAYLOAD, Binary.class);
+                    Map<String, String> props = (Map<String, String>) document.get(Fields.PROPS);
+                    Message message = new Message(payload.getData(), props);
+                    collected.add(message);
+                } else {
+                    if (i == 0) {
+                        throw new NoSuchElementException("Element [" + startIndex + "] has been evicted from the log. Oldest available: [" + idx + "]");
+                    } else {
+                        throw new IllegalStateException("Missing element at [" + (startIndex + i) + "]. Next available at [" + idx + "]");
+                    }
+                }
+            }
+            buffer = collected;
+            firstIndex = (startIndex == FIRST_AVAILABLE) ? 0L : startIndex;
+            if (collected.size() > 0) {
+                lastReceived = currentTimeMillis();
+            }
+        }
+    }
+
+    @SuppressWarnings("BusyWait")
+    private void adaptivePause(long ms) throws InterruptedException {
+        if (interrupted) {
+            throw new InterruptedException();
+        }
+        long currentTime = currentTimeMillis();
+        long stopTime = currentTime + ms;
+        while (currentTime < stopTime) {
+            if (interrupted) {
+                throw new InterruptedException();
+            }
+            sleep(min(FINE_GRAINED_DELAY, ms));
+            currentTime = currentTimeMillis();
+        }
+    }
+
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSender.java
similarity index 63%
rename from org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java
rename to org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSender.java
index 77815ce..cd187f6 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSender.java
@@ -15,30 +15,27 @@
  * KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package org.apache.aries.events.memory;
-
-import java.util.Map;
+package org.apache.aries.events.mongo;
 
 import org.apache.aries.events.api.Message;
 
-class MemoryMessage implements Message {
+import java.util.Map;
 
-    private byte[] payload;
-    private Map<String, String> properties;
+/**
+ * Provides an API for publishing data to a distribution log
+ */
+public interface MessageSender extends AutoCloseable {
 
-    MemoryMessage(byte[] payload, Map<String, String> props) {
-        this.payload = payload;
-        properties = props;
-    }
-
-    @Override
-    public byte[] getPayload() {
-        return this.payload;
-    }
-
-    @Override
-    public Map<String, String> getProperties() {
-        return this.properties;
-    }
+    /**
+     * Publishes a single message to a log.
+     * @param message specifies a message to publish.
+     *             The following value types are supported:
+     *             Integer
+     *             Long
+     *             Boolean
+     *             String
+     *             byte[]
+     */
+    void send(Message message);
 
 }
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSenderImpl.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSenderImpl.java
new file mode 100644
index 0000000..7f60be1
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSenderImpl.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import com.mongodb.MongoWriteException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.IndexOptions;
+import org.apache.aries.events.api.Message;
+import org.bson.Document;
+import org.slf4j.Logger;
+
+import java.util.function.Consumer;
+
+import static com.mongodb.client.model.Filters.lt;
+import static java.lang.System.currentTimeMillis;
+import static org.apache.aries.events.mongo.Common.Fields.INDEX;
+import static org.apache.aries.events.mongo.Common.Fields.PAYLOAD;
+import static org.apache.aries.events.mongo.Common.Fields.PROPS;
+import static org.apache.aries.events.mongo.Common.Fields.TIME_STAMP;
+import static org.apache.aries.events.mongo.Common.upcomingIndex;
+import static org.slf4j.LoggerFactory.getLogger;
+
+final class MessageSenderImpl implements MessageSender {
+
+    //*********************************************
+    // Creation
+    //*********************************************
+
+    static MessageSender messageSender(MongoCollection<Document> col, long maxAge) {
+        return new MessageSenderImpl(col, maxAge);
+    }
+
+    //*********************************************
+    // Specialization
+    //*********************************************
+
+    @Override
+    public void send(Message message) {
+        publish1(message, 3);
+        evict();
+    }
+
+    @Override
+    public void close() {}
+
+    //*********************************************
+    // Internals
+    //*********************************************
+
+    private static final Logger LOGGER = getLogger(MessageSenderImpl.class);
+    private final MongoCollection<Document> collection;
+    private long nextEvictionTime = 0L;
+    private final long maxAge;
+
+    private MessageSenderImpl(MongoCollection<Document> collection, long maxAge) {
+        LOGGER.debug("Creating new publisher: " + collection.getNamespace().getCollectionName());
+        ensureIndexes(collection);
+        this.collection = collection;
+        this.maxAge = maxAge;
+    }
+
+    private void evict() {
+        long currentTime = currentTimeMillis();
+        if (currentTime > nextEvictionTime) {
+            doEvict(currentTime - maxAge);
+            nextEvictionTime = oldestTimeStamp() + maxAge;
+        }
+    }
+
+    /**
+     * Deletes documents that are older then specified threshold but preserving at least one document.
+     * At least one document is needed in the collection in order to keep track of the oldest time stamp.
+     * @param threshold time threshold (ms). Documents older then specified by threshold are removed.
+     */
+    private void doEvict(long threshold) {
+        collection.find()
+                  .projection(new Document(TIME_STAMP, 1))
+                  .sort(new Document(TIME_STAMP, -1))
+                  .limit(1)
+                  .forEach((Consumer<Document>) doc -> {
+               long newestTimeStamp = timeStamp(doc);
+               long adjustedThreshold = Math.min(threshold, newestTimeStamp);
+               collection.deleteMany(lt(TIME_STAMP, adjustedThreshold));
+           });
+    }
+
+    private void publish1(Message message, int retry) {
+        try {
+            long index = upcomingIndex(collection);
+            collection.insertOne(createDoc(index, message));
+        } catch (MongoWriteException e) {
+            if (retry > 0) {
+                publish1(message, retry - 1);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    private long oldestTimeStamp() {
+        try (MongoCursor<Document> docs = collection.find().sort(new Document(TIME_STAMP, 1)).iterator()) {
+            return docs.hasNext() ? docs.next().get(TIME_STAMP, Long.class) : 0L;
+        }
+    }
+
+    private static long timeStamp(Document doc) {
+        return doc.get(TIME_STAMP, Long.class);
+    }
+
+    private Document createDoc(long index, Message message) {
+        Document result = new Document();
+        result.put(INDEX,      index);
+        result.put(TIME_STAMP, currentTimeMillis());
+        result.put(PAYLOAD,    message.getPayload());
+        result.put(PROPS,      message.getProperties());
+        return result;
+    }
+
+    private void ensureIndexes(MongoCollection<Document> col) {
+        col.createIndex(new Document(INDEX, 1), new IndexOptions().unique(true));
+    }
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoEndpoint.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoEndpoint.java
new file mode 100644
index 0000000..4be126d
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoEndpoint.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(
+        name        = "MongoDB configuration",
+        description = "Mongodb URI"
+)
+public @interface MongoEndpoint {
+
+    @AttributeDefinition(
+            name        = "Mongo URI",
+            description = "Specifies mongodb URI as it is specified here: https://docs.mongodb.com/manual/reference/connection-string/ "
+    )
+    String mongoUri() default "mongodb://localhost:27017/aem_distribution";
+
+    @AttributeDefinition(
+            name        = "Max Age",
+            description = "Log retention time expressed in milliseconds"
+    )
+    long maxAge() default 1000L * 3600 * 24 * 7; // One week in ms
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
new file mode 100644
index 0000000..454efde
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.SubscribeRequestBuilder;
+import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
+import org.apache.aries.events.api.Subscription;
+import org.bson.Document;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.metatype.annotations.Designate;
+
+import java.util.Optional;
+
+import static org.apache.aries.events.mongo.Common.DEFAULT_DB_NAME;
+import static org.apache.aries.events.mongo.MongoPosition.index;
+import static org.apache.aries.events.mongo.MongoPosition.position;
+import static org.apache.aries.events.mongo.MongoSubscription.subscription;
+import static org.apache.aries.events.mongo.MessageSenderImpl.messageSender;
+import static org.apache.aries.events.mongo.MessageReceiverImpl.messageReceiver;
+import static org.apache.aries.events.mongo.CachingFactory.cachingFactory;
+import static org.osgi.service.component.annotations.ConfigurationPolicy.REQUIRE;
+
+@Component(service = Messaging.class, configurationPolicy = REQUIRE)
+@Designate(ocd = MongoEndpoint.class)
+public class MongoMessaging implements Messaging {
+
+    @Override
+    public void send(String topic, Message message) {
+        MessageSender sender = senderFactory.get(topic);
+        sender.send(message);
+    }
+
+    @Override
+    public Subscription subscribe(SubscribeRequestBuilder requestBuilder) {
+        SubscribeRequest request = requestBuilder.build();
+        MongoCollection<Document> collection = database.getCollection(request.getTopic());
+        MessageReceiver receiver = messageReceiver(collection);
+        return subscription(receiver, index(request.getPosition()), request.getSeek(), request.getCallback());
+    }
+
+    @Override
+    public Position positionFromString(String position) {
+        long index = Long.parseLong(position);
+        return position(index);
+    }
+
+    // *******************************************************
+    // Private
+    // *******************************************************
+
+    private CachingFactory<String, MessageSender> senderFactory;
+    private MongoClient client;
+    private MongoDatabase database;
+
+    @Activate
+    protected void activate(MongoEndpoint config) {
+        MongoClientURI uri = new MongoClientURI(config.mongoUri());
+        client = new MongoClient(uri);
+        String dbName = Optional.ofNullable(uri.getDatabase()).orElse(DEFAULT_DB_NAME);
+        this.database = client.getDatabase(dbName);
+        this.senderFactory = cachingFactory(topic -> {
+            MongoCollection<Document> collection = database.getCollection(topic);
+            return messageSender(collection, config.maxAge());
+        });
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        client.close();
+    }
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
new file mode 100644
index 0000000..910b15a
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import org.apache.aries.events.api.Position;
+
+class MongoPosition implements Position {
+
+    static Position position(long index) {
+        return new MongoPosition(index);
+    }
+
+    static long index(Position position) {
+        return ((MongoPosition) position).index;
+    }
+
+    @Override
+    public String positionToString() {
+        return String.valueOf(index);
+    }
+
+    @Override
+    public int compareTo(Position o) {
+        long thatIndex = ((MongoPosition) o).index;
+        if (this.index > thatIndex) return 1;
+        if (this.index == thatIndex) return 0;
+	return -1;
+    }
+
+    // *******************************************************
+    // Private
+    // *******************************************************
+
+    private final long index;
+
+    private MongoPosition(long index) {
+	this.index = index;
+    }
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoSubscription.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoSubscription.java
new file mode 100644
index 0000000..f991502
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoSubscription.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.Subscription;
+import org.slf4j.Logger;
+
+import java.util.function.Consumer;
+
+import static java.lang.Thread.currentThread;
+import static java.lang.Thread.interrupted;
+import static org.apache.aries.events.mongo.MongoPosition.position;
+import static org.slf4j.LoggerFactory.getLogger;
+
+final class MongoSubscription implements Subscription {
+
+    //*********************************************
+    // Creation
+    //*********************************************
+
+    static MongoSubscription subscription(
+            MessageReceiver receiver, long index, Seek fallBack, Consumer<Received> consumer
+    ) {
+        assert index >= 0L : "Illegal log index: [" + index + "]";
+        return new MongoSubscription(receiver, index, consumer);
+    }
+
+    static MongoSubscription subscription(
+            MessageReceiver receiver, Seek seek, Consumer<Received> consumer
+    ) {
+        switch (seek) {
+            case latest:
+                return new MongoSubscription(receiver, LATEST_INDEX, consumer);
+            case earliest:
+                return new MongoSubscription(receiver, EARLIEST_INDEX, consumer);
+            default:
+                throw new AssertionError(seek);
+        }
+    }
+
+    //*********************************************
+    // Package interface
+    //*********************************************
+
+    long index() {
+        return index;
+    }
+
+    //*********************************************
+    // Specialization
+    //*********************************************
+
+    @Override
+    public void close() {
+        receiver.close();
+    }
+
+    @Override
+    public String toString() {
+        return "Subscription" + receiver + '[' + index + ']';
+    }
+
+    //*********************************************
+    // Private
+    //*********************************************
+
+    private static final long LATEST_INDEX = -1;
+    private static final long EARLIEST_INDEX = -2;
+    private static final Logger LOGGER = getLogger(MongoSubscription.class);
+    private final MessageReceiver receiver;
+    private long index;
+    private final Consumer<Received> consumer;
+
+    private MongoSubscription(
+            MessageReceiver receiver, long index, Consumer<Received> consumer
+    ) {
+        this.consumer = consumer;
+        this.receiver = receiver;
+        if (index == EARLIEST_INDEX) {
+            this.index = receiver.earliestIndex();
+        } else if (index == LATEST_INDEX) {
+            this.index = receiver.latestIndex();
+        } else {
+            this.index = index;
+        }
+        this.index = index == LATEST_INDEX ? receiver.latestIndex() : index;
+        startBackgroundThread(() -> poll(receiver), "MongoMessageConsumer-" + receiver);
+    }
+
+    private void poll(MessageReceiver receiver) {
+        while (!interrupted()) {
+            try {
+                Message message = receiver.receive(index);
+                LOGGER.debug("Received: " + message);
+                Received received = new Received(position(index), message);
+                consumer.accept(received);
+                index += 1L;
+            } catch (InterruptedException e) {
+                currentThread().interrupt();
+            } catch (Exception e) {
+                LOGGER.error("Error handling message", e);
+            }
+        }
+        LOGGER.debug("Quitting " + this);
+        receiver.close();
+    }
+
+    private static Thread startBackgroundThread(Runnable runnable, String threadName) {
+        Thread thread = new Thread(runnable, threadName);
+        thread.setDaemon(true);
+        thread.start();
+        return thread;
+    }
+
+}
\ No newline at end of file
diff --git a/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/MongoProvider.java b/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/MongoProvider.java
new file mode 100644
index 0000000..783c702
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/MongoProvider.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.bson.Document;
+import org.junit.rules.ExternalResource;
+
+import java.util.Optional;
+import java.util.logging.Logger;
+
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Provides connection to an external mongodb instance
+ * New database gets created for each test and dropped
+ * afterwards.
+ * Database URL must be provided by mongoUri system
+ * property
+ */
+public class MongoProvider extends ExternalResource {
+
+    MongoCollection<Document> getCollection(String name) {
+        return database.getCollection(name);
+    }
+
+    //*********************************************
+    // Internals
+    //*********************************************
+
+    private static final String MONGO_URI_PROP = "aries.events.test.mongoUri";
+    private static final String DEFAULT_DB_NAME = "tmp_aries_events_test";
+    private MongoDatabase database;
+    private MongoClient client;
+
+    @Override
+    protected void before() {
+	String mongoUri = mongoUri();
+	client = MongoClients.create(mongoUri);
+	String dbName = Optional.ofNullable(new MongoClientURI(mongoUri).getDatabase())
+		.orElse(DEFAULT_DB_NAME);
+	database = client.getDatabase(dbName);
+    }
+
+    @Override
+    protected void after() {
+        if (database != null) {
+	    database.drop();
+	}
+        if (client != null) {
+	    client.close();
+	}
+    }
+
+    private static String mongoUri() {
+	String result = System.getProperty(MONGO_URI_PROP);
+	if (result == null) {
+	    String message = "No mongo URI provided.\n" +
+		    "  In order to enable mongo tests, define " + MONGO_URI_PROP + " system property\n" +
+		    "  to point to a running instance of mongodb.\n" +
+		    "  Example:\n" +
+		    "        mvn test -D" + MONGO_URI_PROP + "=mongodb://localhost:27017/";
+	    System.out.println("WARNING: " + message);
+	    assumeTrue(message, false);
+	}
+	return result;
+    }
+
+}
diff --git a/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/SenderReceiverTest.java b/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/SenderReceiverTest.java
new file mode 100644
index 0000000..f48627d
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/SenderReceiverTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import com.mongodb.client.MongoCollection;
+import org.apache.aries.events.api.Message;
+import org.bson.Document;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.aries.events.mongo.MessageReceiverImpl.messageReceiver;
+import static org.apache.aries.events.mongo.MessageSenderImpl.messageSender;
+import static org.junit.Assert.assertEquals;
+
+public class SenderReceiverTest {
+
+    @Test public void testReplicate() throws InterruptedException {
+        MongoCollection<Document> collection = mongoProvider.getCollection("events");
+        MessageSender sender = messageSender(collection, 1000 * 60 * 60 * 24 * 7);
+        MessageReceiver receiver = messageReceiver(collection);
+        Message expected = new Message(new byte[]{ 1, 2, 3 }, mapOf(
+                keyVal("key1", "val1"),
+                keyVal("key2", "val2"))
+        );
+        sender.send(expected);
+        sender.send(expected);
+        Message actual = receiver.receive(0);
+        assertEquals(expected, actual);
+    }
+
+    @Test(expected = NoSuchElementException.class)
+    public void testEvicted() throws InterruptedException {
+        MongoCollection<Document> collection = mongoProvider.getCollection("events");
+        MessageSender sender = messageSender(collection, 0);
+        MessageReceiver receiver = messageReceiver(collection);
+        Message expected = new Message(new byte[] { 1, 2, 3}, emptyMap());
+        sender.send(expected);
+        sender.send(expected);
+        receiver.receive(0);
+    }
+
+    //*********************************************
+    // Internals
+    //*********************************************
+
+    private MongoCollection<Document> collection;
+
+    @Rule
+    public MongoProvider mongoProvider = new MongoProvider();
+
+    private static Map.Entry<String, String> keyVal(String key, String value) {
+        return new SimpleEntry<>(key, value);
+    }
+
+    private static Map<String, String> mapOf(Map.Entry<String, String>... mappings) {
+        Map<String, String> result = new HashMap<>();
+        for (Map.Entry<String, String> entry : mappings) {
+            result.put(entry.getKey(), entry.getValue());
+        }
+        return result;
+    }
+
+}
diff --git a/pom.xml b/pom.xml
index 44c5c17..6366297 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,21 +38,19 @@
         <url>https://issues.apache.org/jira/browse/ARIES</url>
     </issueManagement>
 
-    <prerequisites>
-        <maven>3.5</maven>
-    </prerequisites>
-
     <inceptionYear>2018</inceptionYear>
 
     <modules>
         <module>org.apache.aries.events.api</module>
         <module>org.apache.aries.events.memory</module>
+        <module>org.apache.aries.events.mongo</module>
+        <module>org.apache.aries.events.kafka</module>
     </modules>
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <bnd.version>4.1.0</bnd.version>
-        <slf4j.version>1.7.14</slf4j.version>
+        <slf4j.version>1.7.25</slf4j.version>
         <log4j.version>1.2.6</log4j.version>
         <exam.version>4.12.0</exam.version>
     </properties>
@@ -85,6 +83,11 @@
             <artifactId>slf4j-api</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>2.0.0</version>
+        </dependency>
 
         <dependency>
             <groupId>org.slf4j</groupId>
@@ -95,6 +98,7 @@
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <version>2.20.0</version>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>junit</groupId>
@@ -126,6 +130,7 @@
             <groupId>org.apache.servicemix.bundles</groupId>
             <artifactId>org.apache.servicemix.bundles.hamcrest</artifactId>
             <version>1.3_1</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 
@@ -166,12 +171,6 @@
         <pluginManagement>
             <plugins>
                 <plugin>
-                    <groupId>org.apache.felix</groupId>
-                    <artifactId>maven-bundle-plugin</artifactId>
-                    <version>4.1.0</version>
-                    <extensions>true</extensions>
-                </plugin>
-                <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-compiler-plugin</artifactId>
                     <version>3.8.0</version>
@@ -229,7 +228,6 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.8.0</version>
                 <configuration>
                     <source>1.8</source>
                     <target>1.8</target>
@@ -250,7 +248,6 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-jar-plugin</artifactId>
-                <version>3.1.0</version>
                 <configuration>
                     <archive>
                         <manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>