Merge pull request #20 from cschneider/master
Build badge
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..bf018d2
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,4 @@
+language: java
+jdk:
+ - oraclejdk8
+
\ No newline at end of file
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
index 13419ee..44b77df 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
@@ -17,13 +17,56 @@
*/
package org.apache.aries.events.api;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
/**
* TODO If we allow wild card consumption then a message also needs a topic
*/
-public interface Message {
- byte[] getPayload();
+public final class Message {
+
+ private final byte[] payload;
+ private final Map<String, String> properties;
+
+ public Message(byte[] payload, Map<String, String> properties) {
+ requireNonNull(payload);
+ requireNonNull(properties);
+ this.payload = payload.clone();
+ this.properties = unmodifiableMap(new HashMap<>(properties));
+ }
+
+ public byte[] getPayload() {
+ return payload.clone();
+ }
- Map<String, String> getProperties();
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public String toString() {
+ return "Message" + properties;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Message message = (Message) o;
+ return Arrays.equals(payload, message.payload) &&
+ properties.equals(message.properties);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(properties);
+ result = 31 * result + Arrays.hashCode(payload);
+ return result;
+ }
+
}
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
index 36433d9..a88954c 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Messaging.java
@@ -17,9 +17,6 @@
*/
package org.apache.aries.events.api;
-import java.util.Map;
-import java.util.function.Consumer;
-
/**
* Journaled messaging API
*/
@@ -27,27 +24,21 @@
/**
* Send a message to a topic. When this method returns the message
* is safely persisted.
+ *
+ * Messages can be consumed by subscribing to the topic via the #subscribe method.
+ *
+ * Two messages sent sequentially to the same topic by the same
+ * thread, are guaranteed to be consumed in the same order by all subscribers.
*/
- Position send(String topic, Message message);
+ void send(String topic, Message message);
/**
- * Subscribe to a topic. The callback is called for each message received.
- *
- * @param topic to consume from. TODO Do we allow wild cards?
- * @param position in the topic to start consuming from
- * @param seek where to start from when position is not valid or null
- * @param callback will be called for each message received
- * @return Returned subscription must be closed by the caller to unsubscribe
+ * Subscribe to a topic.
+ * The returned subscription must be closed by the caller to unsubscribe.
+ *
+ * @param request to subscribe
*/
- Subscription subscribe(String topic, Position position, Seek seek, Consumer<Received> callback);
-
- /**
- * Create a message with payload and metadata
- * @param payload
- * @param props
- * @return
- */
- Message newMessage(byte[] payload, Map<String, String> props);
+ Subscription subscribe(SubscribeRequestBuilder request);
/**
* Deserialize the position from the string
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
index c6ed99f..e211638 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Position.java
@@ -21,8 +21,15 @@
* Position in a the topic.
* E.g. For a kafka implementation this would be a list of (partition, offset) as we do not support partitions
* this could simply be like an offset.
- * TODO How do we provide ordering without being too specific?
+ *
+ * The {@code Position} positions are ordered. The relative order between
+ * two positions can be computed by invoking {@code Comparable#compareTo}.
+ * Comparing this position with a specified position will return a negative
+ * integer, zero, or a positive integer as this position happened before,
+ * happened concurrently, or happened after the specified position.
*/
-public interface Position {
- long getOffset();
+public interface Position extends Comparable<Position> {
+
+ String positionToString();
+
}
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Seek.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Seek.java
index e47853b..3dd8757 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Seek.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Seek.java
@@ -21,5 +21,14 @@
* Starting position when no Position is available
*/
public enum Seek {
- earliest, latest;
+
+ /**
+ * Seek to the first position (happened the earliest) on a topic.
+ */
+ earliest,
+
+ /**
+ * Seek to the last position (happened the latest) on a topic.
+ */
+ latest;
}
diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
new file mode 100644
index 0000000..71cf838
--- /dev/null
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/SubscribeRequestBuilder.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.aries.events.api;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.function.Consumer;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+@ParametersAreNonnullByDefault
+public final class SubscribeRequestBuilder {
+
+ private SubscribeRequest subscribeRequest;
+
+ private SubscribeRequestBuilder(SubscribeRequest subscribeRequest) {
+ this.subscribeRequest = subscribeRequest;
+ }
+
+ /**
+ * Build a subscription request for the given topic and {@code Consumer} callback.
+ *
+ * @param topic to consume from
+ * @param callback to be invoked for each message consumed
+ * @return a new subscription request
+ */
+ public static SubscribeRequestBuilder to(String topic, Consumer<Received> callback) {
+ return new SubscribeRequestBuilder(new SubscribeRequest(topic, callback));
+ }
+
+ /**
+ * Set the {@code Position} position to start consuming from.
+ *
+ * @param position in the topic to start consuming from
+ * @return the updated subscribe request
+ */
+ public SubscribeRequestBuilder startAt(Position position) {
+ this.subscribeRequest.position = position;
+ return this;
+ }
+
+ /**
+ * Set the earliest or latest position to start consuming from
+ * when the position is {@code null} or not valid. By default,
+ * seek is set to {@link Seek#latest}.
+ *
+ * @param seek where to start consuming when no valid position is specified
+ * @return the updated subscribe request
+ */
+ public SubscribeRequestBuilder seek(Seek seek) {
+ this.subscribeRequest.seek = requireNonNull(seek, "Seek must not be null");
+ return this;
+ }
+
+
+ public SubscribeRequest build() {
+ return subscribeRequest;
+ }
+
+ public static class SubscribeRequest {
+ private final String topic;
+ private final Consumer<Received> callback;
+ private Position position;
+ private Seek seek = Seek.latest;
+
+ private SubscribeRequest(String topic, Consumer<Received> callback) {
+ this.topic = topic;
+ this.callback = callback;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public Position getPosition() {
+ return position;
+ }
+
+ public Seek getSeek() {
+ return seek;
+ }
+
+ public Consumer<Received> getCallback() {
+ return callback;
+ }
+ }
+}
diff --git a/org.apache.aries.events.kafka/pom.xml b/org.apache.aries.events.kafka/pom.xml
new file mode 100644
index 0000000..bf85186
--- /dev/null
+++ b/org.apache.aries.events.kafka/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.aries.events</groupId>
+ <artifactId>org.apache.aries.events</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>org.apache.aries.events.kafka</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.aries.events</groupId>
+ <artifactId>org.apache.aries.events.api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- Apache Kafka -->
+ <dependency>
+ <groupId>org.apache.servicemix.bundles</groupId>
+ <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+ <version>2.1.0_1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.12</artifactId>
+ <version>2.1.0</version>
+ </dependency>
+ </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaEndpoint.java b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaEndpoint.java
new file mode 100644
index 0000000..a638815
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaEndpoint.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(name = "Apache Aries Events - Apache Kafka endpoint",
+ description = "Apache Kafka endpoint")
+public @interface KafkaEndpoint {
+
+ @AttributeDefinition(name = "Kafka Bootstrap Servers",
+ description = "A comma separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster.")
+ String kafkaBootstrapServers() default "localhost:9092";
+
+
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java
new file mode 100644
index 0000000..7f97d3e
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequestBuilder;
+import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
+import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.Type;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.metatype.annotations.Designate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.Integer.parseInt;
+import static java.lang.Long.parseLong;
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.singleton;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.stream.StreamSupport.stream;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+@Type("kafka")
+@Component(service = Messaging.class, configurationPolicy = ConfigurationPolicy.REQUIRE)
+@Designate(ocd = KafkaEndpoint.class)
+public class KafkaMessaging implements Messaging {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaMessaging.class);
+
+ /**
+ * The partition to send and receive records.
+ */
+ private static final int PARTITION = 0;
+
+ /**
+ * Shared Kafka producer instance ({@code KafkaProducer}s are thread-safe).
+ */
+ private KafkaProducer<String, byte[]> producer;
+
+ private Map<String, Object> producerConfig;
+
+ private KafkaEndpoint endPoint;
+
+ @Activate
+ public void activate(KafkaEndpoint endPoint) {
+ this.endPoint = endPoint;
+ producerConfig = new HashMap<>();
+ producerConfig.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerConfig.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, endPoint.kafkaBootstrapServers());
+ // We favour durability over throughput
+ // and thus requires full acknowledgment
+ // from replica leader and followers.
+ producerConfig.put(ACKS_CONFIG, "all");
+ producerConfig = unmodifiableMap(producerConfig);
+ }
+
+ @Deactivate
+ public void deactivate() {
+ closeQuietly(producer);
+ }
+
+ @Override
+ public void send(String topic, Message message) {
+ ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(topic, PARTITION, null, message.getPayload(), toHeaders(message.getProperties()));
+ try {
+ RecordMetadata metadata = kafkaProducer().send(record).get();
+ LOG.info(format("Sent to %s", metadata));
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(format("Failed to send mesage on topic %s", topic), e);
+ }
+ }
+
+ @Override
+ public Subscription subscribe(SubscribeRequestBuilder requestBuilder) {
+ SubscribeRequest request = requestBuilder.build();
+ KafkaConsumer<String, byte[]> consumer = buildKafkaConsumer(request.getSeek());
+
+ TopicPartition topicPartition = new TopicPartition(request.getTopic(), PARTITION);
+
+ Collection<TopicPartition> topicPartitions = singleton(topicPartition);
+ consumer.assign(topicPartitions);
+
+ if (request.getPosition() != null) {
+ consumer.seek(topicPartition, asKafkaPosition(request.getPosition()).getOffset());
+ } else if (request.getSeek() == Seek.earliest) {
+ consumer.seekToBeginning(topicPartitions);
+ } else {
+ consumer.seekToEnd(topicPartitions);
+ }
+
+ KafkaSubscription subscription = new KafkaSubscription(consumer, request.getCallback());
+ // TODO pool the threads
+ Thread thread = new Thread(subscription);
+ thread.setDaemon(true);
+ thread.start();
+ return subscription;
+ }
+
+ @Override
+ public Position positionFromString(String position) {
+ String[] chunks = position.split(":");
+ if (chunks.length != 2) {
+ throw new IllegalArgumentException(format("Illegal position format %s", position));
+ }
+ return new KafkaPosition(parseInt(chunks[0]), parseLong(chunks[1]));
+ }
+
+ static String positionToString(Position position) {
+ KafkaPosition kafkaPosition = asKafkaPosition(position);
+ return format("%s:%s", kafkaPosition.getPartition(), kafkaPosition.getOffset());
+ }
+
+ static Iterable<Header> toHeaders(Map<String, String> properties) {
+ return properties.entrySet().stream()
+ .map(KafkaMessaging::toHeader)
+ .collect(Collectors.toList());
+ }
+
+ static Map<String, String> toProperties(Headers headers) {
+ return stream(headers.spliterator(), true)
+ .collect(Collectors.toMap(Header::key, header -> new String(header.value(), UTF_8)));
+ }
+
+ static RecordHeader toHeader(Map.Entry<String, String> property) {
+ return new RecordHeader(property.getKey(), property.getValue().getBytes(UTF_8));
+ }
+
+ static Message toMessage(ConsumerRecord<String, byte[]> record) {
+ return new Message(record.value(), toProperties(record.headers()));
+ }
+
+
+ private synchronized KafkaProducer<String, byte[]> kafkaProducer() {
+ if (producer == null) {
+ producer = new KafkaProducer<>(producerConfig);
+ }
+ return producer;
+ }
+
+ private KafkaConsumer<String, byte[]> buildKafkaConsumer(Seek seek) {
+
+ String groupId = UUID.randomUUID().toString();
+
+ Map<String, Object> consumerConfig = new HashMap<>();
+ consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, endPoint.kafkaBootstrapServers());
+ consumerConfig.put(GROUP_ID_CONFIG, groupId);
+ consumerConfig.put(ENABLE_AUTO_COMMIT_CONFIG, false);
+ consumerConfig.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ consumerConfig.put(AUTO_OFFSET_RESET_CONFIG, seek.name());
+
+ return new KafkaConsumer<>(unmodifiableMap(consumerConfig));
+ }
+
+
+ private void closeQuietly(Closeable closeable) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (IOException ignore) {
+ // ignore
+ }
+ }
+ }
+
+ private static KafkaPosition asKafkaPosition(Position position) {
+ if (! KafkaPosition.class.isInstance(position)) {
+ throw new IllegalArgumentException(format("Position %s must be and instance of %s", position, KafkaPosition.class.getCanonicalName()));
+ }
+ return (KafkaPosition) position;
+ }
+
+
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaPosition.java b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaPosition.java
new file mode 100644
index 0000000..7e36216
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaPosition.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import javax.annotation.Nonnull;
+
+import org.apache.aries.events.api.Position;
+
+public final class KafkaPosition implements Position {
+
+ private final int partition;
+
+ private final long offset;
+
+ public KafkaPosition(int partition, long offset) {
+ this.partition = partition;
+ this.offset = offset;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ @Override
+ public String toString() {
+ return positionToString();
+ }
+
+ @Override
+ public String positionToString() {
+ return KafkaMessaging.positionToString(this);
+ }
+
+ @Override
+ public int compareTo(@Nonnull Position p) {
+ return Long.compare(offset, ((KafkaPosition)p).offset);
+ }
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java
new file mode 100644
index 0000000..eaf94c6
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.util.function.Consumer;
+
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Subscription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
+import static java.time.Duration.ofHours;
+import static java.util.Objects.requireNonNull;
+import static org.apache.aries.events.kafka.KafkaMessaging.toMessage;
+
+public class KafkaSubscription implements Subscription, Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSubscription.class);
+
+ private volatile boolean running = true;
+
+ private final KafkaConsumer<String, byte[]> consumer;
+
+ private final Consumer<Received> callback;
+
+ public KafkaSubscription(KafkaConsumer<String, byte[]> consumer, Consumer<Received> callback) {
+ this.consumer = requireNonNull(consumer);
+ this.callback = requireNonNull(callback);
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (;running;) {
+ ConsumerRecords<String, byte[]> records = consumer.poll(ofHours(1));
+ records.forEach(record -> callback.accept(toReceived(record)));
+ }
+ } catch (WakeupException e) {
+ if (running) {
+ LOG.error("WakeupException while running {}", e.getMessage(), e);
+ throw e;
+ } else {
+ LOG.debug("WakeupException while stopping {}", e.getMessage(), e);
+ }
+ } catch(Throwable t) {
+ LOG.error(format("Catch Throwable %s closing subscription", t.getMessage()), t);
+ throw t;
+ } finally {
+ // Close the network connections and sockets
+ consumer.close();
+ }
+ }
+
+ @Override
+ public void close() {
+ running = false;
+ consumer.wakeup();
+ }
+
+ private Received toReceived(ConsumerRecord<String, byte[]> record) {
+ Position position = new KafkaPosition(record.partition(), record.offset());
+ return new Received(position, toMessage(record));
+ }
+
+}
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaMessagingTest.java b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaMessagingTest.java
new file mode 100644
index 0000000..b37951b
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaMessagingTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.SubscribeRequestBuilder;
+import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.kafka.setup.KafkaBaseTest;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static java.nio.charset.Charset.forName;
+import static java.util.Collections.singletonMap;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+public class KafkaMessagingTest extends KafkaBaseTest {
+
+ @Test
+ public void testPositionFromString() throws Exception {
+ Messaging messaging = new KafkaMessaging();
+ KafkaPosition kafkaPosition = (KafkaPosition) messaging.positionFromString("0:100");
+ assertEquals(0, kafkaPosition.getPartition());
+ assertEquals(100, kafkaPosition.getOffset());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPositionFromStringIllegalArgument() throws Exception {
+ Messaging messaging = new KafkaMessaging();
+ messaging.positionFromString("0:100:23");
+ }
+
+ @Test(timeout = 10000)
+ public void testSendAndReceive() throws Exception {
+
+ String topic = "test_send_and_receive";
+ createTopic(topic, 1);
+
+ KafkaEndpoint kafkaEndpoint = Mockito.mock(KafkaEndpoint.class);
+ when(kafkaEndpoint.kafkaBootstrapServers())
+ .thenReturn(getKafkaLocal().getKafkaBootstrapServer());
+ KafkaMessaging messaging = new KafkaMessaging();
+ messaging.activate(kafkaEndpoint);
+
+ byte[] payload = "test".getBytes(forName("UTF-8"));
+
+ Message message = new Message(payload, singletonMap("prop1", "value1"));
+ messaging.send(topic, message);
+
+ Semaphore invoked = new Semaphore(0);
+
+ SubscribeRequestBuilder requestBuilder = SubscribeRequestBuilder
+ .to(topic, (received) -> invoked.release())
+ .startAt(new KafkaPosition(0, 0));
+
+ try (Subscription subscription = messaging.subscribe(requestBuilder)) {
+ invoked.tryAcquire(10, TimeUnit.SECONDS);
+ }
+
+ messaging.deactivate();
+ }
+
+}
\ No newline at end of file
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaPositionTest.java b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaPositionTest.java
new file mode 100644
index 0000000..3acd699
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaPositionTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.aries.events.api.Position;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KafkaPositionTest {
+
+ private static final Random RAND = new Random();
+
+ @Test
+ public void testGetPartition() throws Exception {
+ assertEquals(10, new KafkaPosition(10, 1000).getPartition());
+ }
+
+ @Test
+ public void testGetOffset() throws Exception {
+ assertEquals(1000, new KafkaPosition(10, 1000).getOffset());
+ }
+
+ @Test
+ public void testPositionToString() throws Exception {
+ assertEquals("10:1000", new KafkaPosition(10, 1000).positionToString());
+ }
+
+ @Test
+ public void testCompareTo() throws Exception {
+ assertEquals(0, comparePositions(position(RAND.nextInt(), 5), position(RAND.nextInt(), 5)));
+ assertEquals(1, comparePositions(position(RAND.nextInt(), 10), position(RAND.nextInt(), 5)));
+ assertEquals(-1, comparePositions(position(RAND.nextInt(), 2), position(RAND.nextInt(), 5)));
+ }
+
+ @Test
+ public void testOrder() {
+ NavigableMap<Position, String> positions = new TreeMap<>();
+ positions.put(new KafkaPosition(0, 0), "earliest");
+ positions.put(new KafkaPosition(0, 1), "mid");
+ positions.put(new KafkaPosition(0, 2), "latest");
+ assertEquals("earliest", positions.firstEntry().getValue());
+ assertEquals("latest", positions.lastEntry().getValue());
+ }
+
+ private int comparePositions(KafkaPosition position1, KafkaPosition position2) {
+ return position1.compareTo(position2);
+ }
+
+ private KafkaPosition position(int partition, long offset) {
+ return new KafkaPosition(partition, offset);
+ }
+}
\ No newline at end of file
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaBaseTest.java b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaBaseTest.java
new file mode 100644
index 0000000..61763b6
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaBaseTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka.setup;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.createTempDirectory;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.aries.events.kafka.setup.KafkaLocal.getKafkaProperties;
+import static org.apache.kafka.clients.admin.AdminClient.create;
+import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
+
+public class KafkaBaseTest {
+
+ private static KafkaLocal kafkaLocal;
+
+ private static ZooKeeperLocal zooKeeperLocal;
+
+ private static Logger LOG = LoggerFactory.getLogger(KafkaBaseTest.class);
+
+ @BeforeClass
+ public static void startKafka() throws IOException {
+
+ int zkPort = randomAvailablePort();
+ String zkDir = createTempDirectory("zk").toString();
+ String zkConnect = format("127.0.0.1:%s", zkPort);
+ zooKeeperLocal = new ZooKeeperLocal(ZooKeeperLocal.getZooKeeperProperties(zkDir, zkPort));
+ LOG.info(format("Started local ZooKeeper server on port %s and dataDirectory %s", zkPort, zkDir));
+
+ int kafkaPort = randomAvailablePort();
+ String kafkaLogDir = createTempDirectory("kafka").toString();
+ kafkaLocal = new KafkaLocal(getKafkaProperties(kafkaLogDir, kafkaPort, zkConnect));
+ LOG.info(format("Started local Kafka on port %s and logDirectory %s", zkConnect, kafkaLogDir));
+ }
+
+ @AfterClass
+ public static void shutdownKafka() {
+ if (kafkaLocal != null) {
+ kafkaLocal.stop();
+ }
+ if (zooKeeperLocal != null) {
+ zooKeeperLocal.stop();
+ }
+ }
+
+ public static KafkaLocal getKafkaLocal() {
+ return kafkaLocal;
+ }
+
+
+ public Set<String> listTopics() {
+ try (AdminClient admin = buildAdminClient()) {
+ ListTopicsResult result = admin.listTopics();
+ return result.names().get();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to list topics", e);
+ }
+ }
+
+ public void createTopic(String topicName, int numPartitions) {
+ NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
+ try (AdminClient admin = buildAdminClient()) {
+ CreateTopicsResult result = admin.createTopics(singletonList(newTopic));
+ result.values().get(topicName).get();
+ LOG.info(format("created topic %s", topicName));
+ } catch (Exception e) {
+ throw new RuntimeException(format("Failed to create topic %s", topicName), e);
+ }
+ }
+
+ public void deleteTopic(String topicName) {
+ try (AdminClient admin = buildAdminClient()) {
+ DeleteTopicsResult result = admin.deleteTopics(Collections.singleton(topicName));
+ result.all().get();
+ LOG.info(format("deleted topic %s", topicName));
+ } catch (Exception e) {
+ throw new RuntimeException(format("Failed to delete topic %s", topicName), e);
+ }
+ }
+
+ private static int randomAvailablePort() {
+ try (ServerSocket ss = new ServerSocket(0)) {
+ return ss.getLocalPort();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private AdminClient buildAdminClient() {
+ return create(singletonMap(BOOTSTRAP_SERVERS_CONFIG, kafkaLocal.getKafkaBootstrapServer()));
+ }
+
+}
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaLocal.java b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaLocal.java
new file mode 100644
index 0000000..e55e136
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaLocal.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka.setup;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
+import static java.lang.String.format;
+
+public class KafkaLocal {
+
+ private final KafkaServerStartable server;
+
+ private final String kafkaBootstrapServer;
+
+ public KafkaLocal(Map<String, Object> kafkaProperties) {
+ KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+ kafkaBootstrapServer = format("%s:%s", kafkaConfig.hostName(), kafkaConfig.port());
+ server = new KafkaServerStartable(kafkaConfig);
+ server.startup();
+ }
+
+ public void stop() {
+ server.shutdown();
+ }
+
+ public String getKafkaBootstrapServer() {
+ return kafkaBootstrapServer;
+ }
+
+ public static Map<String, Object> getKafkaProperties(String logDir, int port, String zkConnect) {
+ Map<String, Object> props = new HashMap<>();
+ props.put("host.name", "localhost");
+ props.put("log.dir", logDir);
+ props.put("port", port);
+ props.put("zookeeper.connect", zkConnect);
+ return props;
+ }
+}
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/ZooKeeperLocal.java b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/ZooKeeperLocal.java
new file mode 100644
index 0000000..ea479ff
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/ZooKeeperLocal.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka.setup;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Properties;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+public class ZooKeeperLocal {
+
+ private final ZooKeeperServerMain server;
+
+ public ZooKeeperLocal(Properties zkProperties) {
+ QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
+ try {
+ quorumPeerConfig.parseProperties(zkProperties);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ ServerConfig serverConfig = new ServerConfig();
+ serverConfig.readFrom(quorumPeerConfig);
+
+ server = new ZooKeeperServerMain();
+
+ Thread dt = new Thread(runnable(serverConfig));
+ dt.setDaemon(true);
+ dt.start();
+ }
+
+ public void stop() {
+ try {
+ Method shutdown = server.getClass().getDeclaredMethod("shutdown");
+ shutdown.setAccessible(true);
+ shutdown.invoke(server);
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ }
+
+ public static Properties getZooKeeperProperties(String dataDirectory, int port) {
+ Properties props = new Properties();
+ props.put("dataDir", dataDirectory);
+ props.put("clientPort", port);
+ return props;
+ }
+
+ private Runnable runnable(ServerConfig serverConfig) {
+ return () -> {
+ try {
+ server.runFromConfig(serverConfig);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
index b888272..674864a 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/InMemoryMessaging.java
@@ -19,13 +19,12 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
import org.apache.aries.events.api.Message;
import org.apache.aries.events.api.Messaging;
import org.apache.aries.events.api.Position;
-import org.apache.aries.events.api.Received;
-import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequestBuilder;
+import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
import org.apache.aries.events.api.Subscription;
import org.apache.aries.events.api.Type;
import org.osgi.service.component.annotations.Component;
@@ -33,33 +32,39 @@
@Component
@Type("memory")
public class InMemoryMessaging implements Messaging {
- private Map<String, Topic> topics = new ConcurrentHashMap<>();
+ private final Map<String, Topic> topics = new ConcurrentHashMap<>();
+ private final int keepAtLeast;
+
+ public InMemoryMessaging() {
+ this(10000);
+ }
- @Override
- public Position send(String topicName, Message message) {
- Topic topic = getOrCreate(topicName);
- return topic.send(message);
+ public InMemoryMessaging(int keepAtLeast) {
+ this.keepAtLeast = keepAtLeast;
+
}
@Override
- public Subscription subscribe(String topicName, Position position, Seek seek, Consumer<Received> callback) {
+ public void send(String topicName, Message message) {
Topic topic = getOrCreate(topicName);
- return topic.subscribe(position, seek, callback);
+ topic.send(message);
}
@Override
- public Message newMessage(byte[] payload, Map<String, String> props) {
- return new MemoryMessage(payload, props);
+ public Subscription subscribe(SubscribeRequestBuilder requestBuilder) {
+ SubscribeRequest request = requestBuilder.build();
+ Topic topic = getOrCreate(request.getTopic());
+ return topic.subscribe(request);
}
@Override
public Position positionFromString(String position) {
- long offset = new Long(position).longValue();
+ long offset = Long.parseLong(position);
return new MemoryPosition(offset);
}
private Topic getOrCreate(String topicName) {
- return topics.computeIfAbsent(topicName, topicName2 -> new Topic(topicName2));
+ return topics.computeIfAbsent(topicName, topicName2 -> new Topic(topicName2, keepAtLeast));
}
}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
index 4fb5d0a..15c9585 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Journal.java
@@ -17,22 +17,40 @@
*/
package org.apache.aries.events.memory;
+import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
-public class Journal<T> {
- private AtomicLong nextOffset = new AtomicLong();
- private ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
+class Journal<T> {
+ private final int keepAtLeast;
+ private final AtomicLong nextOffset = new AtomicLong();
+ private final ConcurrentNavigableMap<Long, T> messages = new ConcurrentSkipListMap<>();
+ private final AtomicLong count = new AtomicLong();
+
+ public Journal(int keepAtLeast) {
+ this.keepAtLeast = keepAtLeast;
+ }
public long append(T message) {
+ if (count.incrementAndGet() > keepAtLeast * 2) {
+ evict();
+ }
Long offset = nextOffset.getAndIncrement();
messages.put(offset, message);
return offset;
}
+ private synchronized void evict() {
+ Iterator<Long> it = messages.keySet().iterator();
+ for (int c = 0; c < keepAtLeast; c++) {
+ messages.remove(it.next());
+ }
+ count.set(0);
+ }
+
public long getFirstOffset() {
try {
return messages.firstKey();
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
index 694c42d..fab1bea 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryPosition.java
@@ -27,12 +27,17 @@
this.offset = offset;
}
- public long getOffset() {
+ long getOffset() {
return offset;
}
@Override
- public String toString() {
- return new Long(offset).toString();
+ public String positionToString() {
+ return Long.toString(offset);
+ }
+
+ @Override
+ public int compareTo(Position p) {
+ return Long.compare(offset, ((MemoryPosition)p).offset);
}
}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
index 02bea04..91a6d71 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -17,106 +17,107 @@
*/
package org.apache.aries.events.memory;
-import java.util.HashSet;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.aries.events.api.Message;
import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
import org.apache.aries.events.api.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Topic {
- private Logger log = LoggerFactory.getLogger(this.getClass());
+class Topic {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
- private String topicName;
- private Journal<Message> journal;
- private Set<Subscription> subscriptions = new HashSet<>();
+ private final String topicName;
+ private final Journal<Message> journal;
- public Topic(String topicName) {
+ public Topic(String topicName, int keepAtLeast) {
this.topicName = topicName;
- this.journal = new Journal<>();
+ this.journal = new Journal<>(keepAtLeast);
}
- public Position send(Message message) {
+ public synchronized Position send(Message message) {
long offset = this.journal.append(message);
+ notifyAll();
return new MemoryPosition(offset);
}
- public Subscription subscribe(Position position, Seek seek, Consumer<Received> callback) {
- long startOffset = getStartOffset(position, seek);
+ public Subscription subscribe(SubscribeRequest request) {
+ long startOffset = getStartOffset((MemoryPosition) request.getPosition(), request.getSeek());
log.debug("Consuming from " + startOffset);
- return new TopicSubscription(startOffset, callback);
+ return new TopicSubscription(startOffset, request.getCallback());
}
- private long getStartOffset(Position position, Seek seek) {
+ private long getStartOffset(MemoryPosition position, Seek seek) {
if (position != null) {
return position.getOffset();
} else {
if (seek == Seek.earliest) {
return this.journal.getFirstOffset();
- } else if (seek == Seek.latest) {
- return this.journal.getLastOffset() + 1;
} else {
- throw new IllegalArgumentException("Seek must not be null");
+ return this.journal.getLastOffset() + 1;
}
}
}
+ private synchronized Entry<Long, Message> waitNext(long currentOffset) throws InterruptedException {
+ Entry<Long, Message> entry = journal.getNext(currentOffset);
+ if (entry != null) {
+ return entry;
+ }
+ log.debug("Waiting for next message");
+ wait();
+ return journal.getNext(currentOffset);
+ }
+
class TopicSubscription implements Subscription {
private Consumer<Received> callback;
private ExecutorService executor;
- private volatile boolean running;
private long currentOffset;
TopicSubscription(long startOffset, Consumer<Received> callback) {
this.currentOffset = startOffset;
this.callback = callback;
- this.running = true;
String name = "Poller for " + topicName;
this.executor = Executors.newSingleThreadExecutor(r -> new Thread(r, name));
this.executor.execute(this::poll);
}
-
+
private void poll() {
- while (running) {
- Entry<Long, Message> entry = journal.getNext(currentOffset);
- if (entry != null) {
- long offset = entry.getKey();
- try {
- MemoryPosition position = new MemoryPosition(this.currentOffset);
- Received received = new Received(position, entry.getValue());
- callback.accept(received);
- } catch (Exception e) {
- log.warn(e.getMessage(), e);
- }
- this.currentOffset = offset + 1;
- } else {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
+ try {
+ while (true) {
+ Entry<Long, Message> entry = waitNext(currentOffset);
+ if (entry != null) {
+ handleMessage(entry);
}
}
+ } catch (InterruptedException e) {
+ log.debug("Poller thread for consumer on topic " + topicName + " stopped.");
}
}
+ private void handleMessage(Entry<Long, Message> entry) {
+ long offset = entry.getKey();
+ try {
+ MemoryPosition position = new MemoryPosition(this.currentOffset);
+ Received received = new Received(position, entry.getValue());
+ callback.accept(received);
+ } catch (Exception e) {
+ log.warn(e.getMessage(), e);
+ }
+ this.currentOffset = offset + 1;
+ }
+
@Override
public void close() {
- this.running = false;
executor.shutdown();
- try {
- executor.awaitTermination(10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- // Ignore
- }
- subscriptions.remove(this);
+ executor.shutdownNow();
}
}
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MemoryPositionTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MemoryPositionTest.java
new file mode 100644
index 0000000..7613d2c
--- /dev/null
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MemoryPositionTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.aries.events.memory;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MemoryPositionTest {
+
+ @Test
+ public void testCompareTo() throws Exception {
+ assertEquals(0, comparePositions(position(5), position(5)));
+ assertEquals(1, comparePositions(position(10), position(5)));
+ assertEquals(-1, comparePositions(position(2), position(5)));
+ }
+
+ private int comparePositions(MemoryPosition position1, MemoryPosition position2) {
+ return position1.compareTo(position2);
+ }
+
+ private MemoryPosition position(long offset) {
+ return new MemoryPosition(offset);
+ }
+
+}
\ No newline at end of file
diff --git a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
index a8262bd..5edee5d 100644
--- a/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
+++ b/org.apache.aries.events.memory/src/test/java/org/apache/aries/events/memory/MessagingTest.java
@@ -1,8 +1,12 @@
package org.apache.aries.events.memory;
+import static org.apache.aries.events.api.SubscribeRequestBuilder.to;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.initMocks;
@@ -13,6 +17,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -21,6 +26,7 @@
import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Received;
import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequestBuilder;
import org.apache.aries.events.api.Subscription;
import org.junit.After;
import org.junit.Before;
@@ -28,9 +34,12 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
+import org.mockito.Mockito;
public class MessagingTest {
+ private static final long MAX_MANY = 100000l;
+
@Mock
private Consumer<Received> callback;
@@ -55,62 +64,69 @@
@Test
public void testPositionFromString() {
Position pos = messaging.positionFromString("1");
- assertThat(pos.getOffset(), equalTo(1l));
+ assertThat(pos.compareTo(new MemoryPosition(1)), equalTo(0));
+ assertThat(pos.positionToString(), equalTo("1"));
}
@Test
public void testSend() {
- subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+ subscribe(to("test", callback).seek(Seek.earliest));
String content = "testcontent";
- Position pos = send("test", content);
- assertThat(pos.toString(), equalTo("0"));
-
- verify(callback, timeout(1000)).accept(messageCaptor.capture());
+ send("test", content);
+ assertMessages(1);
Received received = messageCaptor.getValue();
assertThat(received.getMessage().getPayload(), equalTo(toBytes(content)));
- assertThat(received.getPosition().getOffset(), equalTo(0l));
+ assertEquals(0, received.getPosition().compareTo(new MemoryPosition(0)));
assertThat(received.getMessage().getProperties().size(), equalTo(1));
assertThat(received.getMessage().getProperties().get("my"), equalTo("testvalue"));
}
- @Test(expected=IllegalArgumentException.class)
- public void testInvalid() {
- messaging.subscribe("test", null, null, callback);
+ @Test(expected=NullPointerException.class)
+ public void testInvalidSubscribe() {
+ subscribe(to("test", callback).seek(null));
+ }
+
+ @Test
+ public void testExceptionInHandler() {
+ doThrow(new RuntimeException("Expected exception")).when(callback).accept(Mockito.any(Received.class));
+ subscribe(to("test", callback));
+ send("test", "testcontent");
+ assertMessages(1);
}
@Test
public void testEarliestBefore() {
- subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+ subscribe(to("test", callback).seek(Seek.earliest));
send("test", "testcontent");
send("test", "testcontent2");
- verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+ assertMessages(2);
assertThat(messageContents(), contains("testcontent", "testcontent2"));
}
-
+
@Test
public void testEarliestAfter() {
send("test", "testcontent");
- subscriptions.add(messaging.subscribe("test", null, Seek.earliest, callback));
+ subscribe(to("test", callback).seek(Seek.earliest));
send("test", "testcontent2");
- verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+ assertMessages(2);
assertThat(messageContents(), contains("testcontent", "testcontent2"));
}
@Test
public void testLatestBefore() {
- subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+ subscribe(to("test", callback));
send("test", "testcontent");
send("test", "testcontent2");
- verify(callback, timeout(1000).times(2)).accept(messageCaptor.capture());
+ assertMessages(2);
assertThat(messageContents(), contains("testcontent", "testcontent2"));
}
@Test
public void testLatest() {
send("test", "testcontent");
- subscriptions.add(messaging.subscribe("test", null, Seek.latest, callback));
+ subscribe(to("test", callback));
send("test", "testcontent2");
- verify(callback, timeout(1000)).accept(messageCaptor.capture());
+ assertMessages(1);
assertThat(messageContents(), contains("testcontent2"));
}
@@ -118,10 +134,33 @@
public void testFrom1() {
send("test", "testcontent");
send("test", "testcontent2");
- subscriptions.add(messaging.subscribe("test", new MemoryPosition(1l), Seek.earliest, callback));
- verify(callback, timeout(1000)).accept(messageCaptor.capture());
+ subscribe(to("test", callback).startAt(new MemoryPosition(1l)).seek(Seek.earliest));
+ assertMessages(1);
assertThat(messageContents(), contains("testcontent2"));
}
+
+ @Test
+ public void testMany() {
+ AtomicLong count = new AtomicLong();
+ Consumer<Received> manyCallback = rec -> { count.incrementAndGet(); };
+ messaging.subscribe(to("test", manyCallback));
+ for (long c=0; c < MAX_MANY; c++) {
+ send("test", "content " + c);
+ if (c % 10000 == 0) {
+ System.out.println("Sending " + c);
+ }
+
+ }
+ await().until(count::get, equalTo(MAX_MANY));
+ }
+
+ private void assertMessages(int num) {
+ verify(callback, timeout(1000).times(num)).accept(messageCaptor.capture());
+ }
+
+ private void subscribe(SubscribeRequestBuilder request) {
+ this.subscriptions.add(messaging.subscribe(request));
+ }
private List<String> messageContents() {
return messageCaptor.getAllValues().stream()
@@ -132,11 +171,11 @@
return new String(rec.getMessage().getPayload(), Charset.forName("UTF-8"));
}
- private Position send(String topic, String content) {
+ private void send(String topic, String content) {
Map<String, String> props = new HashMap<String, String>();
props.put("my", "testvalue");
- Message message = messaging.newMessage(toBytes(content), props);
- return messaging.send(topic, message);
+ Message message = new Message(toBytes(content), props);
+ messaging.send(topic, message);
}
private byte[] toBytes(String content) {
diff --git a/org.apache.aries.events.mongo/pom.xml b/org.apache.aries.events.mongo/pom.xml
new file mode 100644
index 0000000..34cb08b
--- /dev/null
+++ b/org.apache.aries.events.mongo/pom.xml
@@ -0,0 +1,43 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The SF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.aries.events</groupId>
+ <artifactId>org.apache.aries.events</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <groupId>org.apache.aries.events.mongo</groupId>
+ <artifactId>org.apache.aries.events.mongo</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.aries.events</groupId>
+ <artifactId>org.apache.aries.events.api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <version>3.8.2</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/CachingFactory.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/CachingFactory.java
new file mode 100644
index 0000000..5b7fdc6
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/CachingFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * A factory that keeps previously created instances in a cache so that
+ * they will get reused if requested repeatedly.
+ * Currently there is no cache size limit implemented so this implementation
+ * is only good for the use case with limited parameter space.
+ * @param <K> key type. Serves as cache key as well as an input parameter for the
+ * factory method. Must provide sensible implementations for
+ * equals and hashCode methods
+ * @param <V> result type.
+ */
+public final class CachingFactory<K, V extends AutoCloseable> implements Closeable {
+
+
+ public static <K2, V2 extends AutoCloseable> CachingFactory<K2, V2> cachingFactory(Function<K2, V2> create) {
+ return new CachingFactory<K2, V2>(create);
+ }
+
+ /**
+ * Find or created a value for the specified key
+ * @param arg key instance
+ * @return either an existing (cached) value of newly created one.
+ */
+ public synchronized V get(K arg) {
+ return cache.computeIfAbsent(arg, create);
+ }
+
+ /**
+ * Clears all cached instances properly disposing them.
+ */
+ public synchronized void clear() {
+ cache.values().stream()
+ .forEach(CachingFactory::safeClose);
+ cache.clear();
+ }
+
+ /**
+ * Closing this factory properly disposing all cached instances
+ */
+ @Override
+ public void close() {
+ clear();
+ }
+
+ //*********************************************
+ // Private
+ //*********************************************
+
+ private static final Logger LOG = getLogger(CachingFactory.class);
+ private final Map<K, V> cache = new HashMap<K, V>();
+ private final Function<K, V> create;
+
+ private CachingFactory(Function<K, V> create) {
+ this.create = create;
+ }
+
+ private static void safeClose(AutoCloseable closable) {
+ try {
+ closable.close();
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/Common.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/Common.java
new file mode 100644
index 0000000..be5bc76
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/Common.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import com.mongodb.client.MongoCollection;
+import org.bson.Document;
+
+import static com.mongodb.client.model.Filters.lte;
+import static com.mongodb.client.model.Indexes.descending;
+import static org.apache.aries.events.mongo.Common.Fields.INDEX;
+
+/**
+ * Common string definitions
+ */
+@SuppressWarnings({"HardCodedStringLiteral", "InterfaceNeverImplemented"})
+interface Common {
+
+ String DEFAULT_DB_NAME = "aem-replication";
+
+ /** MongoDB field names */
+ interface Fields {
+ String INDEX = "i";
+ String TIME_STAMP = "t";
+ String PAYLOAD = "d";
+ String PROPS = "p";
+ }
+
+ /**
+ * Returns the next available index in the collection
+ * @param col collection to check. The collection must contain
+ * log messages published by a Publisher instance
+ * @return the index that should be assigned to the next message when
+ * it gets published
+ */
+ static long upcomingIndex(MongoCollection<Document> col) {
+ Document doc = col.find(lte(INDEX, Long.MAX_VALUE))
+ .sort(descending(INDEX))
+ .first();
+ if (doc != null) {
+ long latestAvailable = doc.getLong(INDEX);
+ return latestAvailable + 1L;
+ } else {
+ return 0L;
+ }
+ }
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiver.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiver.java
new file mode 100644
index 0000000..1a9626e
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiver.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import org.apache.aries.events.api.Message;
+
+public interface MessageReceiver extends AutoCloseable {
+
+ /** returns data entry for the specified offset.
+ * If necessary waits until data is available.
+ * If data entry at the specified offset has
+ * been evicted, throws NoSuchElement exception
+ * @param index an offset to the desired entry
+ * @return requested data entry together with the
+ * offset for the next data entry
+ */
+ Message receive(long index) throws InterruptedException;
+
+ /** returns the index of the earliest available
+ * data entry. It also causes the receiver to
+ * pre-fetch and cache a batch of earliest available
+ * entries thus giving the user a chance to consume
+ * them and catch up before they get evicted
+ * @return an index of the first available data entry or
+ * 0 if the log is empty
+ */
+ long earliestIndex();
+
+ /** returns the index of the next available data
+ * entry.
+ * The returned index points to the entry yet to
+ * be inserted into the log.
+ * @return index of the data entry that will be
+ * inserted next. 0 if the log is empty.
+ */
+ long latestIndex();
+
+ @Override
+ void close();
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiverImpl.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiverImpl.java
new file mode 100644
index 0000000..f03f701
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiverImpl.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+import com.mongodb.Mongo;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.mongo.Common.Fields;
+import org.bson.Document;
+import org.bson.types.Binary;
+import org.slf4j.Logger;
+
+import static java.lang.Math.min;
+import static java.lang.System.currentTimeMillis;
+import static java.lang.Thread.sleep;
+import static java.util.Collections.emptyList;
+import static org.apache.aries.events.mongo.Common.Fields.PAYLOAD;
+import static org.apache.aries.events.mongo.Common.Fields.INDEX;
+import static org.apache.aries.events.mongo.Common.upcomingIndex;
+import static org.slf4j.LoggerFactory.getLogger;
+
+final class MessageReceiverImpl implements MessageReceiver {
+
+ static MessageReceiver messageReceiver(MongoCollection<Document> col) {
+ return new MessageReceiverImpl(col, Optional.empty());
+ }
+
+ @Override
+ public Message receive(long index) throws InterruptedException {
+ fetch(index);
+ long bufferIndex = index - firstIndex;
+ assert bufferIndex < buffer.size() : bufferIndex + ", " + buffer.size();
+ return buffer.get((int) bufferIndex);
+ }
+
+ @Override
+ public long earliestIndex() {
+ refreshBuffer(FIRST_AVAILABLE);
+ return firstIndex;
+ }
+
+ @Override
+ public long latestIndex() {
+ long result = upcomingIndex(col);
+ if (result > 0) {
+ result -= 1;
+ }
+ return result;
+ }
+
+ @Override
+ public void close() {
+ // MongoDB driver doesn't like to be interruped so
+ // we try to get out of the poll loop in a gentle way
+ interrupted = true;
+ mongoClient.ifPresent(Mongo::close);
+ }
+
+ //*********************************************
+ // Internals
+ //*********************************************
+
+ private static final Logger LOGGER = getLogger(MessageReceiverImpl.class);
+ private static final long FINE_GRAINED_DELAY = 100L;
+ private static final long FIRST_AVAILABLE = -1;
+ private final Optional<MongoClient> mongoClient;
+ private final MongoCollection<Document> col;
+ private long maxWaitTime = 1000L;
+ private int fetchLimit = 100;
+ private long lastReceived = currentTimeMillis();
+ private long firstIndex = 0L;
+ private List<Message> buffer = emptyList();
+ private volatile boolean interrupted = false;
+
+ private MessageReceiverImpl(MongoCollection<Document> col, Optional<MongoClient> mongoClient) {
+ LOGGER.debug("Creating new receiver: " + col.getNamespace().getCollectionName());
+ this.mongoClient = mongoClient;
+ this.col = col;
+ }
+
+ private void fetch(long index) throws InterruptedException {
+ while (firstIndex > index || firstIndex + buffer.size() <= index) {
+ long delay = min(maxWaitTime, (currentTimeMillis() - lastReceived) / 2);
+ adaptivePause(delay);
+ refreshBuffer(index);
+ }
+ }
+
+ private void refreshBuffer(long index) {
+ long startIndex = index;
+ try (MongoCursor<Document> cursor = col.find(Filters.gte(INDEX, startIndex)).iterator()) {
+ List<Message> collected = new ArrayList<>(fetchLimit);
+ while (cursor.hasNext()) {
+ int i = collected.size();
+ Document document = cursor.next();
+ long idx = document.get(INDEX, Long.class);
+ if (startIndex == FIRST_AVAILABLE) {
+ startIndex = idx;
+ }
+ if (idx == startIndex + i) {
+ Binary payload = document.get(PAYLOAD, Binary.class);
+ Map<String, String> props = (Map<String, String>) document.get(Fields.PROPS);
+ Message message = new Message(payload.getData(), props);
+ collected.add(message);
+ } else {
+ if (i == 0) {
+ throw new NoSuchElementException("Element [" + startIndex + "] has been evicted from the log. Oldest available: [" + idx + "]");
+ } else {
+ throw new IllegalStateException("Missing element at [" + (startIndex + i) + "]. Next available at [" + idx + "]");
+ }
+ }
+ }
+ buffer = collected;
+ firstIndex = (startIndex == FIRST_AVAILABLE) ? 0L : startIndex;
+ if (collected.size() > 0) {
+ lastReceived = currentTimeMillis();
+ }
+ }
+ }
+
+ @SuppressWarnings("BusyWait")
+ private void adaptivePause(long ms) throws InterruptedException {
+ if (interrupted) {
+ throw new InterruptedException();
+ }
+ long currentTime = currentTimeMillis();
+ long stopTime = currentTime + ms;
+ while (currentTime < stopTime) {
+ if (interrupted) {
+ throw new InterruptedException();
+ }
+ sleep(min(FINE_GRAINED_DELAY, ms));
+ currentTime = currentTimeMillis();
+ }
+ }
+
+}
diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSender.java
similarity index 63%
rename from org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java
rename to org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSender.java
index 77815ce..cd187f6 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/MemoryMessage.java
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSender.java
@@ -15,30 +15,27 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package org.apache.aries.events.memory;
-
-import java.util.Map;
+package org.apache.aries.events.mongo;
import org.apache.aries.events.api.Message;
-class MemoryMessage implements Message {
+import java.util.Map;
- private byte[] payload;
- private Map<String, String> properties;
+/**
+ * Provides an API for publishing data to a distribution log
+ */
+public interface MessageSender extends AutoCloseable {
- MemoryMessage(byte[] payload, Map<String, String> props) {
- this.payload = payload;
- properties = props;
- }
-
- @Override
- public byte[] getPayload() {
- return this.payload;
- }
-
- @Override
- public Map<String, String> getProperties() {
- return this.properties;
- }
+ /**
+ * Publishes a single message to a log.
+ * @param message specifies a message to publish.
+ * The following value types are supported:
+ * Integer
+ * Long
+ * Boolean
+ * String
+ * byte[]
+ */
+ void send(Message message);
}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSenderImpl.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSenderImpl.java
new file mode 100644
index 0000000..7f60be1
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSenderImpl.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import com.mongodb.MongoWriteException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.IndexOptions;
+import org.apache.aries.events.api.Message;
+import org.bson.Document;
+import org.slf4j.Logger;
+
+import java.util.function.Consumer;
+
+import static com.mongodb.client.model.Filters.lt;
+import static java.lang.System.currentTimeMillis;
+import static org.apache.aries.events.mongo.Common.Fields.INDEX;
+import static org.apache.aries.events.mongo.Common.Fields.PAYLOAD;
+import static org.apache.aries.events.mongo.Common.Fields.PROPS;
+import static org.apache.aries.events.mongo.Common.Fields.TIME_STAMP;
+import static org.apache.aries.events.mongo.Common.upcomingIndex;
+import static org.slf4j.LoggerFactory.getLogger;
+
+final class MessageSenderImpl implements MessageSender {
+
+ //*********************************************
+ // Creation
+ //*********************************************
+
+ static MessageSender messageSender(MongoCollection<Document> col, long maxAge) {
+ return new MessageSenderImpl(col, maxAge);
+ }
+
+ //*********************************************
+ // Specialization
+ //*********************************************
+
+ @Override
+ public void send(Message message) {
+ publish1(message, 3);
+ evict();
+ }
+
+ @Override
+ public void close() {}
+
+ //*********************************************
+ // Internals
+ //*********************************************
+
+ private static final Logger LOGGER = getLogger(MessageSenderImpl.class);
+ private final MongoCollection<Document> collection;
+ private long nextEvictionTime = 0L;
+ private final long maxAge;
+
+ private MessageSenderImpl(MongoCollection<Document> collection, long maxAge) {
+ LOGGER.debug("Creating new publisher: " + collection.getNamespace().getCollectionName());
+ ensureIndexes(collection);
+ this.collection = collection;
+ this.maxAge = maxAge;
+ }
+
+ private void evict() {
+ long currentTime = currentTimeMillis();
+ if (currentTime > nextEvictionTime) {
+ doEvict(currentTime - maxAge);
+ nextEvictionTime = oldestTimeStamp() + maxAge;
+ }
+ }
+
+ /**
+ * Deletes documents that are older then specified threshold but preserving at least one document.
+ * At least one document is needed in the collection in order to keep track of the oldest time stamp.
+ * @param threshold time threshold (ms). Documents older then specified by threshold are removed.
+ */
+ private void doEvict(long threshold) {
+ collection.find()
+ .projection(new Document(TIME_STAMP, 1))
+ .sort(new Document(TIME_STAMP, -1))
+ .limit(1)
+ .forEach((Consumer<Document>) doc -> {
+ long newestTimeStamp = timeStamp(doc);
+ long adjustedThreshold = Math.min(threshold, newestTimeStamp);
+ collection.deleteMany(lt(TIME_STAMP, adjustedThreshold));
+ });
+ }
+
+ private void publish1(Message message, int retry) {
+ try {
+ long index = upcomingIndex(collection);
+ collection.insertOne(createDoc(index, message));
+ } catch (MongoWriteException e) {
+ if (retry > 0) {
+ publish1(message, retry - 1);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ private long oldestTimeStamp() {
+ try (MongoCursor<Document> docs = collection.find().sort(new Document(TIME_STAMP, 1)).iterator()) {
+ return docs.hasNext() ? docs.next().get(TIME_STAMP, Long.class) : 0L;
+ }
+ }
+
+ private static long timeStamp(Document doc) {
+ return doc.get(TIME_STAMP, Long.class);
+ }
+
+ private Document createDoc(long index, Message message) {
+ Document result = new Document();
+ result.put(INDEX, index);
+ result.put(TIME_STAMP, currentTimeMillis());
+ result.put(PAYLOAD, message.getPayload());
+ result.put(PROPS, message.getProperties());
+ return result;
+ }
+
+ private void ensureIndexes(MongoCollection<Document> col) {
+ col.createIndex(new Document(INDEX, 1), new IndexOptions().unique(true));
+ }
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoEndpoint.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoEndpoint.java
new file mode 100644
index 0000000..4be126d
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoEndpoint.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(
+ name = "MongoDB configuration",
+ description = "Mongodb URI"
+)
+public @interface MongoEndpoint {
+
+ @AttributeDefinition(
+ name = "Mongo URI",
+ description = "Specifies mongodb URI as it is specified here: https://docs.mongodb.com/manual/reference/connection-string/ "
+ )
+ String mongoUri() default "mongodb://localhost:27017/aem_distribution";
+
+ @AttributeDefinition(
+ name = "Max Age",
+ description = "Log retention time expressed in milliseconds"
+ )
+ long maxAge() default 1000L * 3600 * 24 * 7; // One week in ms
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
new file mode 100644
index 0000000..454efde
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoMessaging.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.SubscribeRequestBuilder;
+import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
+import org.apache.aries.events.api.Subscription;
+import org.bson.Document;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.metatype.annotations.Designate;
+
+import java.util.Optional;
+
+import static org.apache.aries.events.mongo.Common.DEFAULT_DB_NAME;
+import static org.apache.aries.events.mongo.MongoPosition.index;
+import static org.apache.aries.events.mongo.MongoPosition.position;
+import static org.apache.aries.events.mongo.MongoSubscription.subscription;
+import static org.apache.aries.events.mongo.MessageSenderImpl.messageSender;
+import static org.apache.aries.events.mongo.MessageReceiverImpl.messageReceiver;
+import static org.apache.aries.events.mongo.CachingFactory.cachingFactory;
+import static org.osgi.service.component.annotations.ConfigurationPolicy.REQUIRE;
+
+@Component(service = Messaging.class, configurationPolicy = REQUIRE)
+@Designate(ocd = MongoEndpoint.class)
+public class MongoMessaging implements Messaging {
+
+ @Override
+ public void send(String topic, Message message) {
+ MessageSender sender = senderFactory.get(topic);
+ sender.send(message);
+ }
+
+ @Override
+ public Subscription subscribe(SubscribeRequestBuilder requestBuilder) {
+ SubscribeRequest request = requestBuilder.build();
+ MongoCollection<Document> collection = database.getCollection(request.getTopic());
+ MessageReceiver receiver = messageReceiver(collection);
+ return subscription(receiver, index(request.getPosition()), request.getSeek(), request.getCallback());
+ }
+
+ @Override
+ public Position positionFromString(String position) {
+ long index = Long.parseLong(position);
+ return position(index);
+ }
+
+ // *******************************************************
+ // Private
+ // *******************************************************
+
+ private CachingFactory<String, MessageSender> senderFactory;
+ private MongoClient client;
+ private MongoDatabase database;
+
+ @Activate
+ protected void activate(MongoEndpoint config) {
+ MongoClientURI uri = new MongoClientURI(config.mongoUri());
+ client = new MongoClient(uri);
+ String dbName = Optional.ofNullable(uri.getDatabase()).orElse(DEFAULT_DB_NAME);
+ this.database = client.getDatabase(dbName);
+ this.senderFactory = cachingFactory(topic -> {
+ MongoCollection<Document> collection = database.getCollection(topic);
+ return messageSender(collection, config.maxAge());
+ });
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ client.close();
+ }
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
new file mode 100644
index 0000000..910b15a
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoPosition.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import org.apache.aries.events.api.Position;
+
+class MongoPosition implements Position {
+
+ static Position position(long index) {
+ return new MongoPosition(index);
+ }
+
+ static long index(Position position) {
+ return ((MongoPosition) position).index;
+ }
+
+ @Override
+ public String positionToString() {
+ return String.valueOf(index);
+ }
+
+ @Override
+ public int compareTo(Position o) {
+ long thatIndex = ((MongoPosition) o).index;
+ if (this.index > thatIndex) return 1;
+ if (this.index == thatIndex) return 0;
+ return -1;
+ }
+
+ // *******************************************************
+ // Private
+ // *******************************************************
+
+ private final long index;
+
+ private MongoPosition(long index) {
+ this.index = index;
+ }
+
+}
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoSubscription.java b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoSubscription.java
new file mode 100644
index 0000000..f991502
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoSubscription.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.Subscription;
+import org.slf4j.Logger;
+
+import java.util.function.Consumer;
+
+import static java.lang.Thread.currentThread;
+import static java.lang.Thread.interrupted;
+import static org.apache.aries.events.mongo.MongoPosition.position;
+import static org.slf4j.LoggerFactory.getLogger;
+
+final class MongoSubscription implements Subscription {
+
+ //*********************************************
+ // Creation
+ //*********************************************
+
+ static MongoSubscription subscription(
+ MessageReceiver receiver, long index, Seek fallBack, Consumer<Received> consumer
+ ) {
+ assert index >= 0L : "Illegal log index: [" + index + "]";
+ return new MongoSubscription(receiver, index, consumer);
+ }
+
+ static MongoSubscription subscription(
+ MessageReceiver receiver, Seek seek, Consumer<Received> consumer
+ ) {
+ switch (seek) {
+ case latest:
+ return new MongoSubscription(receiver, LATEST_INDEX, consumer);
+ case earliest:
+ return new MongoSubscription(receiver, EARLIEST_INDEX, consumer);
+ default:
+ throw new AssertionError(seek);
+ }
+ }
+
+ //*********************************************
+ // Package interface
+ //*********************************************
+
+ long index() {
+ return index;
+ }
+
+ //*********************************************
+ // Specialization
+ //*********************************************
+
+ @Override
+ public void close() {
+ receiver.close();
+ }
+
+ @Override
+ public String toString() {
+ return "Subscription" + receiver + '[' + index + ']';
+ }
+
+ //*********************************************
+ // Private
+ //*********************************************
+
+ private static final long LATEST_INDEX = -1;
+ private static final long EARLIEST_INDEX = -2;
+ private static final Logger LOGGER = getLogger(MongoSubscription.class);
+ private final MessageReceiver receiver;
+ private long index;
+ private final Consumer<Received> consumer;
+
+ private MongoSubscription(
+ MessageReceiver receiver, long index, Consumer<Received> consumer
+ ) {
+ this.consumer = consumer;
+ this.receiver = receiver;
+ if (index == EARLIEST_INDEX) {
+ this.index = receiver.earliestIndex();
+ } else if (index == LATEST_INDEX) {
+ this.index = receiver.latestIndex();
+ } else {
+ this.index = index;
+ }
+ this.index = index == LATEST_INDEX ? receiver.latestIndex() : index;
+ startBackgroundThread(() -> poll(receiver), "MongoMessageConsumer-" + receiver);
+ }
+
+ private void poll(MessageReceiver receiver) {
+ while (!interrupted()) {
+ try {
+ Message message = receiver.receive(index);
+ LOGGER.debug("Received: " + message);
+ Received received = new Received(position(index), message);
+ consumer.accept(received);
+ index += 1L;
+ } catch (InterruptedException e) {
+ currentThread().interrupt();
+ } catch (Exception e) {
+ LOGGER.error("Error handling message", e);
+ }
+ }
+ LOGGER.debug("Quitting " + this);
+ receiver.close();
+ }
+
+ private static Thread startBackgroundThread(Runnable runnable, String threadName) {
+ Thread thread = new Thread(runnable, threadName);
+ thread.setDaemon(true);
+ thread.start();
+ return thread;
+ }
+
+}
\ No newline at end of file
diff --git a/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/MongoProvider.java b/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/MongoProvider.java
new file mode 100644
index 0000000..783c702
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/MongoProvider.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.bson.Document;
+import org.junit.rules.ExternalResource;
+
+import java.util.Optional;
+import java.util.logging.Logger;
+
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Provides connection to an external mongodb instance
+ * New database gets created for each test and dropped
+ * afterwards.
+ * Database URL must be provided by mongoUri system
+ * property
+ */
+public class MongoProvider extends ExternalResource {
+
+ MongoCollection<Document> getCollection(String name) {
+ return database.getCollection(name);
+ }
+
+ //*********************************************
+ // Internals
+ //*********************************************
+
+ private static final String MONGO_URI_PROP = "aries.events.test.mongoUri";
+ private static final String DEFAULT_DB_NAME = "tmp_aries_events_test";
+ private MongoDatabase database;
+ private MongoClient client;
+
+ @Override
+ protected void before() {
+ String mongoUri = mongoUri();
+ client = MongoClients.create(mongoUri);
+ String dbName = Optional.ofNullable(new MongoClientURI(mongoUri).getDatabase())
+ .orElse(DEFAULT_DB_NAME);
+ database = client.getDatabase(dbName);
+ }
+
+ @Override
+ protected void after() {
+ if (database != null) {
+ database.drop();
+ }
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ private static String mongoUri() {
+ String result = System.getProperty(MONGO_URI_PROP);
+ if (result == null) {
+ String message = "No mongo URI provided.\n" +
+ " In order to enable mongo tests, define " + MONGO_URI_PROP + " system property\n" +
+ " to point to a running instance of mongodb.\n" +
+ " Example:\n" +
+ " mvn test -D" + MONGO_URI_PROP + "=mongodb://localhost:27017/";
+ System.out.println("WARNING: " + message);
+ assumeTrue(message, false);
+ }
+ return result;
+ }
+
+}
diff --git a/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/SenderReceiverTest.java b/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/SenderReceiverTest.java
new file mode 100644
index 0000000..f48627d
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/SenderReceiverTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import com.mongodb.client.MongoCollection;
+import org.apache.aries.events.api.Message;
+import org.bson.Document;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.aries.events.mongo.MessageReceiverImpl.messageReceiver;
+import static org.apache.aries.events.mongo.MessageSenderImpl.messageSender;
+import static org.junit.Assert.assertEquals;
+
+public class SenderReceiverTest {
+
+ @Test public void testReplicate() throws InterruptedException {
+ MongoCollection<Document> collection = mongoProvider.getCollection("events");
+ MessageSender sender = messageSender(collection, 1000 * 60 * 60 * 24 * 7);
+ MessageReceiver receiver = messageReceiver(collection);
+ Message expected = new Message(new byte[]{ 1, 2, 3 }, mapOf(
+ keyVal("key1", "val1"),
+ keyVal("key2", "val2"))
+ );
+ sender.send(expected);
+ sender.send(expected);
+ Message actual = receiver.receive(0);
+ assertEquals(expected, actual);
+ }
+
+ @Test(expected = NoSuchElementException.class)
+ public void testEvicted() throws InterruptedException {
+ MongoCollection<Document> collection = mongoProvider.getCollection("events");
+ MessageSender sender = messageSender(collection, 0);
+ MessageReceiver receiver = messageReceiver(collection);
+ Message expected = new Message(new byte[] { 1, 2, 3}, emptyMap());
+ sender.send(expected);
+ sender.send(expected);
+ receiver.receive(0);
+ }
+
+ //*********************************************
+ // Internals
+ //*********************************************
+
+ private MongoCollection<Document> collection;
+
+ @Rule
+ public MongoProvider mongoProvider = new MongoProvider();
+
+ private static Map.Entry<String, String> keyVal(String key, String value) {
+ return new SimpleEntry<>(key, value);
+ }
+
+ private static Map<String, String> mapOf(Map.Entry<String, String>... mappings) {
+ Map<String, String> result = new HashMap<>();
+ for (Map.Entry<String, String> entry : mappings) {
+ result.put(entry.getKey(), entry.getValue());
+ }
+ return result;
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 44c5c17..6366297 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,21 +38,19 @@
<url>https://issues.apache.org/jira/browse/ARIES</url>
</issueManagement>
- <prerequisites>
- <maven>3.5</maven>
- </prerequisites>
-
<inceptionYear>2018</inceptionYear>
<modules>
<module>org.apache.aries.events.api</module>
<module>org.apache.aries.events.memory</module>
+ <module>org.apache.aries.events.mongo</module>
+ <module>org.apache.aries.events.kafka</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<bnd.version>4.1.0</bnd.version>
- <slf4j.version>1.7.14</slf4j.version>
+ <slf4j.version>1.7.25</slf4j.version>
<log4j.version>1.2.6</log4j.version>
<exam.version>4.12.0</exam.version>
</properties>
@@ -85,6 +83,11 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>2.0.0</version>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -95,6 +98,7 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.20.0</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
@@ -126,6 +130,7 @@
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.hamcrest</artifactId>
<version>1.3_1</version>
+ <scope>test</scope>
</dependency>
</dependencies>
@@ -166,12 +171,6 @@
<pluginManagement>
<plugins>
<plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <version>4.1.0</version>
- <extensions>true</extensions>
- </plugin>
- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
@@ -229,7 +228,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
@@ -250,7 +248,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
- <version>3.1.0</version>
<configuration>
<archive>
<manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>