SLING-10130 - Allow to retrieve serverUri from MessagingProvider
diff --git a/pom.xml b/pom.xml
index e447348..0ad6a25 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
<parent>
<groupId>org.apache.sling</groupId>
<artifactId>sling-bundle-parent</artifactId>
- <version>35</version>
+ <version>40</version>
<relativePath />
</parent>
@@ -70,20 +70,18 @@
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.distribution.journal.messages</artifactId>
- <version>0.2.0-SNAPSHOT</version>
+ <version>0.2.1-SNAPSHOT</version>
</dependency>
<!-- OSGi -->
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.service.component.annotations</artifactId>
<scope>provided</scope>
- <version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.service.metatype.annotations</artifactId>
<scope>provided</scope>
- <version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
@@ -142,6 +140,7 @@
</dependency>
</dependencies>
+
<reporting>
<plugins>
<plugin>
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index ea4b968..ec31888 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -39,6 +39,8 @@
import java.io.Closeable;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -80,31 +82,37 @@
public static final int PARTITION = 0;
- @Reference
- private EventAdmin eventAdmin;
+ private final ExceptionEventSender eventSender;
- private ExceptionEventSender eventSender;
+ private final String kafkaBootstrapServers;
- private KafkaProducer<String, byte[]> rawProducer = null;
+ private final int requestTimeout;
- private KafkaProducer<String, String> jsonProducer = null;
+ private final int defaultApiTimeout;
- private String kafkaBootstrapServers;
+ private final String securityProtocol;
- private int requestTimeout;
+ private final String saslMechanism;
- private int defaultApiTimeout;
+ private final String saslJaasConfig;
- private String securityProtocol;
+ private final URI serverUri;
- private String saslMechanism;
-
- private String saslJaasConfig;
+ private transient KafkaProducer<String, String> jsonProducer = null;
@Activate
- public void activate(KafkaEndpoint kafkaEndpoint) {
+ public KafkaClientProvider(
+ @Reference EventAdmin eventAdmin,
+ KafkaEndpoint kafkaEndpoint
+ ) {
eventSender = new ExceptionEventSender(eventAdmin);
kafkaBootstrapServers = requireNonNull(kafkaEndpoint.kafkaBootstrapServers());
+ String[] servers = kafkaBootstrapServers.split(",");
+ try {
+ serverUri = new URI(servers[0]);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
requestTimeout = kafkaEndpoint.kafkaRequestTimeout();
defaultApiTimeout = kafkaEndpoint.kafkaDefaultApiTimeout();
securityProtocol = kafkaEndpoint.securityProtocol();
@@ -114,7 +122,6 @@
@Deactivate
public synchronized void close() {
- closeQuietly(rawProducer);
closeQuietly(jsonProducer);
}
@@ -236,4 +243,9 @@
}
return Long.parseLong(chunks[1]);
}
+
+ @Override
+ public URI getServerUri() {
+ return serverUri;
+ }
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
index cafdd69..358a760 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
@@ -39,10 +39,8 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
-import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.service.event.EventAdmin;
@@ -53,8 +51,6 @@
@Mock
EventAdmin eventAdmin;
- @InjectMocks
- @Spy
private KafkaClientProvider provider;
@Mock
@@ -62,9 +58,9 @@
@Before
public void before() {
- doReturn(consumer).when(provider).createConsumer(Mockito.any(), Mockito.any());
KafkaEndpoint config = buildKafkaEndpoint(emptyMap());
- provider.activate(config);
+ provider = Mockito.spy(new KafkaClientProvider(eventAdmin, config));
+ doReturn(consumer).when(provider).createConsumer(Mockito.any(), Mockito.any());
}
@Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
index 851fdfe..4b60822 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
@@ -53,13 +53,10 @@
}
private KafkaClientProvider createProvider() {
- KafkaClientProvider provider = new KafkaClientProvider();
-
Map<String, Object> props = new HashMap<>();
props.put("connectTimeout", "5000");
KafkaEndpoint config = buildKafkaEndpoint(props);
- provider.activate(config);
- return provider;
+ return new KafkaClientProvider(null, config);
}
public MessagingProvider getProvider() {