SLING-10130 - Allow to retrieve serverUri from MessagingProvider
diff --git a/pom.xml b/pom.xml
index e447348..0ad6a25 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.apache.sling</groupId>
         <artifactId>sling-bundle-parent</artifactId>
-        <version>35</version>
+        <version>40</version>
         <relativePath />
     </parent>
 
@@ -70,20 +70,18 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.distribution.journal.messages</artifactId>
-            <version>0.2.0-SNAPSHOT</version>
+            <version>0.2.1-SNAPSHOT</version>
         </dependency>
         <!-- OSGi -->
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.service.component.annotations</artifactId>
             <scope>provided</scope>
-            <version>1.3.0</version>
         </dependency>
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.service.metatype.annotations</artifactId>
             <scope>provided</scope>
-            <version>1.3.0</version>
         </dependency>
         <dependency>
             <groupId>org.osgi</groupId>
@@ -142,6 +140,7 @@
         </dependency>
 
     </dependencies>
+    
     <reporting>
         <plugins>
             <plugin>
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index ea4b968..ec31888 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -39,6 +39,8 @@
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -80,31 +82,37 @@
 
     public static final int PARTITION = 0;
     
-    @Reference
-    private EventAdmin eventAdmin;
+    private final ExceptionEventSender eventSender;
 
-    private ExceptionEventSender eventSender;
+    private final String kafkaBootstrapServers;
 
-    private KafkaProducer<String, byte[]> rawProducer = null;
+    private final int requestTimeout;
 
-    private KafkaProducer<String, String> jsonProducer = null;
+    private final int defaultApiTimeout;
 
-    private String kafkaBootstrapServers;
+    private final String securityProtocol;
 
-    private int requestTimeout;
+    private final String saslMechanism;
 
-    private int defaultApiTimeout;
+    private final String saslJaasConfig;
 
-    private String securityProtocol;
+    private final URI serverUri;
 
-    private String saslMechanism;
-
-    private String saslJaasConfig;
+    private transient KafkaProducer<String, String> jsonProducer = null;
 
     @Activate
-    public void activate(KafkaEndpoint kafkaEndpoint) {
+    public KafkaClientProvider(
+            @Reference EventAdmin eventAdmin, 
+            KafkaEndpoint kafkaEndpoint
+            ) {
         eventSender = new ExceptionEventSender(eventAdmin);
         kafkaBootstrapServers = requireNonNull(kafkaEndpoint.kafkaBootstrapServers());
+        String[] servers = kafkaBootstrapServers.split(",");
+        try {
+            serverUri = new URI(servers[0]);
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
         requestTimeout = kafkaEndpoint.kafkaRequestTimeout();
         defaultApiTimeout = kafkaEndpoint.kafkaDefaultApiTimeout();
         securityProtocol = kafkaEndpoint.securityProtocol();
@@ -114,7 +122,6 @@
     
     @Deactivate
     public synchronized void close() {
-        closeQuietly(rawProducer);
         closeQuietly(jsonProducer);
     }
 
@@ -236,4 +243,9 @@
         }
         return Long.parseLong(chunks[1]);
     }
+
+    @Override
+    public URI getServerUri() {
+        return serverUri;
+    }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
index cafdd69..358a760 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
@@ -39,10 +39,8 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.Spy;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.osgi.service.event.EventAdmin;
 
@@ -53,8 +51,6 @@
     @Mock
     EventAdmin eventAdmin;
     
-    @InjectMocks
-    @Spy
     private KafkaClientProvider provider;
     
     @Mock
@@ -62,9 +58,9 @@
     
     @Before
     public void before() {
-        doReturn(consumer).when(provider).createConsumer(Mockito.any(), Mockito.any());
         KafkaEndpoint config = buildKafkaEndpoint(emptyMap());
-        provider.activate(config);
+        provider = Mockito.spy(new KafkaClientProvider(eventAdmin, config));
+        doReturn(consumer).when(provider).createConsumer(Mockito.any(), Mockito.any());
     }
 
     @Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
index 851fdfe..4b60822 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
@@ -53,13 +53,10 @@
     }
 
     private KafkaClientProvider createProvider() {
-        KafkaClientProvider provider = new KafkaClientProvider();
-
         Map<String, Object> props = new HashMap<>();
         props.put("connectTimeout", "5000");
         KafkaEndpoint config = buildKafkaEndpoint(props);
-        provider.activate(config);
-        return provider;
+        return new KafkaClientProvider(null, config);
     }
     
     public MessagingProvider getProvider() {