Merge pull request #1 from cschneider/SLING-8402
SLING-8402 Only depend on messages bundle
diff --git a/pom.xml b/pom.xml
index 6ba33f0..a72608b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,7 +69,7 @@
<!-- Sling -->
<dependency>
<groupId>org.apache.sling</groupId>
- <artifactId>org.apache.sling.distribution.journal</artifactId>
+ <artifactId>org.apache.sling.distribution.journal.messages</artifactId>
<version>0.1.1-SNAPSHOT</version>
</dependency>
<!-- OSGi -->
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index dfc0936..9216d45 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -35,6 +35,7 @@
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import java.io.Closeable;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -43,7 +44,6 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
@@ -95,8 +95,8 @@
@Deactivate
public void close() {
- IOUtils.closeQuietly(rawProducer);
- IOUtils.closeQuietly(jsonProducer);
+ closeQuietly(rawProducer);
+ closeQuietly(jsonProducer);
}
@Override
@@ -181,6 +181,16 @@
return format("%s:%s", PARTITION, offset);
}
+ private void closeQuietly(final Closeable closeable) {
+ try {
+ if (closeable != null) {
+ closeable.close();
+ }
+ } catch (final IOException ioe) {
+ // ignore
+ }
+ }
+
@Nonnull
private synchronized KafkaProducer<String, byte[]> buildKafkaProducer() {
if (rawProducer == null) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
index bfa9abe..4eebfd0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
@@ -18,7 +18,6 @@
*/
package org.apache.sling.distribution.journal.kafka;
-import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.sling.distribution.journal.MessageInfo;
public class KafkaMessageInfo implements MessageInfo {
@@ -53,6 +52,7 @@
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this);
+ return String.format("Topic: %s, Partition: %d, Offset: %d, CreateTime: %d",
+ topic, partition, offset, createTime);
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
index 8b8b74c..7f73649 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
@@ -21,8 +21,9 @@
import static org.osgi.util.converter.Converters.standardConverter;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
-import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.kafka.KafkaClientProvider;
import org.apache.sling.distribution.journal.kafka.KafkaEndpoint;
@@ -30,8 +31,6 @@
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
-import com.google.common.collect.ImmutableMap;
-
public class KafkaRule implements TestRule {
private KafkaClientProvider provider;
@@ -51,14 +50,15 @@
try (KafkaLocal kafka = new KafkaLocal()) {
this.provider = createProvider();
base.evaluate();
- IOUtils.closeQuietly(this.provider);
+ this.provider.close();
}
}
private KafkaClientProvider createProvider() {
KafkaClientProvider provider = new KafkaClientProvider();
- ImmutableMap<String, String> props = ImmutableMap.of(
- "connectTimeout", "5000");
+
+ Map<String, String> props = new HashMap<>();
+ props.put("connectTimeout", "5000");
KafkaEndpoint config = standardConverter().convert(props).to(KafkaEndpoint.class);
provider.activate(config);
return provider;