Merge pull request #1 from cschneider/SLING-8402

SLING-8402 Only depend on messages bundle
diff --git a/pom.xml b/pom.xml
index 6ba33f0..a72608b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,7 +69,7 @@
         <!-- Sling -->
         <dependency>
             <groupId>org.apache.sling</groupId>
-            <artifactId>org.apache.sling.distribution.journal</artifactId>
+            <artifactId>org.apache.sling.distribution.journal.messages</artifactId>
             <version>0.1.1-SNAPSHOT</version>
         </dependency>
         <!-- OSGi -->
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index dfc0936..9216d45 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -35,6 +35,7 @@
 import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -43,7 +44,6 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.TopicPartition;
@@ -95,8 +95,8 @@
     
     @Deactivate
     public void close() {
-        IOUtils.closeQuietly(rawProducer);
-        IOUtils.closeQuietly(jsonProducer);
+        closeQuietly(rawProducer);
+        closeQuietly(jsonProducer);
     }
 
     @Override
@@ -181,6 +181,16 @@
         return format("%s:%s", PARTITION, offset);
     }
 
+    private void closeQuietly(final Closeable closeable) {
+        try {
+            if (closeable != null) {
+                closeable.close();
+            }
+        } catch (final IOException ioe) {
+            // ignore
+        }
+    }
+
     @Nonnull
     private synchronized KafkaProducer<String, byte[]> buildKafkaProducer() {
         if (rawProducer == null) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
index bfa9abe..4eebfd0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
@@ -18,7 +18,6 @@
  */
 package org.apache.sling.distribution.journal.kafka;
 
-import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.sling.distribution.journal.MessageInfo;
 
 public class KafkaMessageInfo implements MessageInfo {
@@ -53,6 +52,7 @@
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this);
+        return String.format("Topic: %s, Partition: %d, Offset: %d, CreateTime: %d", 
+                topic, partition, offset, createTime);
     }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
index 8b8b74c..7f73649 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
@@ -21,8 +21,9 @@
 import static org.osgi.util.converter.Converters.standardConverter;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.kafka.KafkaClientProvider;
 import org.apache.sling.distribution.journal.kafka.KafkaEndpoint;
@@ -30,8 +31,6 @@
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
 
-import com.google.common.collect.ImmutableMap;
-
 public class KafkaRule implements TestRule {
 
     private KafkaClientProvider provider;
@@ -51,14 +50,15 @@
         try (KafkaLocal kafka = new KafkaLocal()) {
             this.provider = createProvider();
             base.evaluate();
-            IOUtils.closeQuietly(this.provider);
+            this.provider.close();
         }
     }
 
     private KafkaClientProvider createProvider() {
         KafkaClientProvider provider = new KafkaClientProvider();
-        ImmutableMap<String, String> props = ImmutableMap.of(
-                "connectTimeout", "5000");
+        
+        Map<String, String> props = new HashMap<>();
+        props.put("connectTimeout", "5000");
         KafkaEndpoint config = standardConverter().convert(props).to(KafkaEndpoint.class);
         provider.activate(config);
         return provider;