SLING-8451 - Document KafkaEndpoint configuration properties and use Kafka default
* Build KafkaEndpoint using a custom converter instead of the ad-hoc mock
diff --git a/pom.xml b/pom.xml
index a72608b..eb819fe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,7 +125,7 @@
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.converter</artifactId>
- <version>1.0.0</version>
+ <version>1.0.8</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaEndpointBuilder.java b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaEndpointBuilder.java
index be5dde9..3579274 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaEndpointBuilder.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaEndpointBuilder.java
@@ -21,29 +21,24 @@
import java.util.Map;
import org.apache.sling.distribution.journal.kafka.KafkaEndpoint;
+import org.osgi.util.converter.Converter;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import static org.osgi.util.converter.Converters.standardConverter;
public class KafkaEndpointBuilder {
+ /**
+ * Custom converter that return null instead of throwing ConversionException
+ * for properties with null default. See FELIX-6137.
+ */
+ private static final Converter CONVERTER = standardConverter()
+ .newConverterBuilder()
+ .errorHandler((o,t) -> null)
+ .build();
+
public static KafkaEndpoint buildKafkaEndpoint(Map<String, Object> props) {
-
- /*
- * The standardConverter does not support null default
- * Until FELIX-6137 is fixed, we use this 'creative' way
- * to build KafkaEndpoint.
- */
-
- KafkaEndpoint proxy = standardConverter().convert(props).to(KafkaEndpoint.class);
- KafkaEndpoint endpoint = mock(KafkaEndpoint.class);
- when(endpoint.saslJaasConfig()).thenReturn(null);
- when(endpoint.securityProtocol()).thenReturn(proxy.securityProtocol());
- when(endpoint.kafkaBootstrapServers()).thenReturn(proxy.kafkaBootstrapServers());
- when(endpoint.kafkaDefaultApiTimeout()).thenReturn(proxy.kafkaDefaultApiTimeout());
- when(endpoint.kafkaRequestTimeout()).thenReturn(proxy.kafkaRequestTimeout());
- when(endpoint.saslMechanism()).thenReturn(proxy.saslMechanism());
- return endpoint;
+ return CONVERTER.convert(props).to(KafkaEndpoint.class);
}
+
+
}