SLING-8414 - Add support for Azure Eventhubs as backend
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index b4b78b9..75c3e99 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -22,6 +22,7 @@
import static java.util.Collections.singleton;
import static java.util.Collections.unmodifiableMap;
import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
@@ -33,6 +34,8 @@
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
import java.io.Closeable;
import java.io.IOException;
@@ -89,11 +92,20 @@
private int defaultApiTimeout;
+ private String securityProtocol;
+
+ private String saslMechanism;
+
+ private String saslJaasConfig;
+
@Activate
public void activate(KafkaEndpoint kafkaEndpoint) {
kafkaBootstrapServers = requireNonNull(kafkaEndpoint.kafkaBootstrapServers());
requestTimeout = kafkaEndpoint.kafkaRequestTimeout();
defaultApiTimeout = kafkaEndpoint.kafkaDefaultApiTimeout();
+ securityProtocol = kafkaEndpoint.securityProtocol();
+ saslMechanism = kafkaEndpoint.saslMechanism();
+ saslJaasConfig = kafkaEndpoint.saslJaasConfig();
}
@Deactivate
@@ -187,7 +199,13 @@
protected <T> KafkaConsumer<String, T> createConsumer(Class<? extends Deserializer<?>> deserializer, Reset reset) {
String groupId = UUID.randomUUID().toString();
- return new KafkaConsumer<>(consumerConfig(deserializer, groupId, reset));
+ ClassLoader oldClassloader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(KafkaConsumer.class.getClassLoader());
+ try {
+ return new KafkaConsumer<>(consumerConfig(deserializer, groupId, reset));
+ } finally {
+ Thread.currentThread().setContextClassLoader(oldClassloader);
+ }
}
private void closeQuietly(final Closeable closeable) {
@@ -203,6 +221,14 @@
@Nonnull
private synchronized KafkaProducer<String, byte[]> buildKafkaProducer() {
if (rawProducer == null) {
+ ClassLoader oldClassloader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(KafkaProducer.class.getClassLoader());
+ try {
+ rawProducer = new KafkaProducer<>(producerConfig(ByteArraySerializer.class));
+ } finally {
+ Thread.currentThread().setContextClassLoader(oldClassloader);
+ }
+
rawProducer = new KafkaProducer<>(producerConfig(ByteArraySerializer.class));
}
return rawProducer;
@@ -217,8 +243,7 @@
}
private Map<String, Object> consumerConfig(Object deserializer, String consumerGroupId, Reset reset) {
- Map<String, Object> config = new HashMap<>();
- config.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
+ Map<String, Object> config = commonConfig();
config.put(GROUP_ID_CONFIG, consumerGroupId);
config.put(ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(DEFAULT_API_TIMEOUT_MS_CONFIG, defaultApiTimeout);
@@ -229,15 +254,25 @@
}
private Map<String, Object> producerConfig(Object serializer) {
- Map<String, Object> config = new HashMap<>();
+ Map<String, Object> config = commonConfig();
+ config.put(REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
config.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(VALUE_SERIALIZER_CLASS_CONFIG, serializer);
- config.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
- config.put(REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
config.put(ACKS_CONFIG, "all");
return unmodifiableMap(config);
}
+ private Map<String, Object> commonConfig() {
+ Map<String, Object> config = new HashMap<>();
+ config.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
+ config.put(SASL_MECHANISM, saslMechanism);
+ config.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
+ if (!saslJaasConfig.isEmpty()) {
+ config.put(SASL_JAAS_CONFIG, saslJaasConfig);
+ }
+ return config;
+ }
+
private long offset(String assign) {
String[] chunks = assign.split(":");
if (chunks.length != 2) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java
index 3e5428b..7301005 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java
@@ -36,4 +36,17 @@
@AttributeDefinition(name = "Kafka Default API Timeout",
description = "Kafka Default API Timeout in ms.")
int kafkaDefaultApiTimeout() default 60000;
+
+ @AttributeDefinition(name = "Security protocol",
+ description = "e.g. SASL_SSL")
+ String securityProtocol() default "PLAINTEXT";
+
+ @AttributeDefinition(name = "Sasl mechanism",
+ description = "e.g. PLAIN")
+ String saslMechanism() default "GSSAPI";
+
+ @AttributeDefinition(name = "Sasl jaas config",
+ description = "")
+ String saslJaasConfig() default "";
+
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 687d253..8dfec8f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -97,10 +97,17 @@
}
private void handleRecord(ConsumerRecord<String, byte[]> record) {
- Class<?> type = Types.getType(
- parseInt(getHeaderValue(record.headers(), "type")),
- parseInt(getHeaderValue(record.headers(), "version")));
- HandlerAdapter<?> adapter = handlers.get(type);
+ Class<?> type;
+ HandlerAdapter<?> adapter;
+ try {
+ type = Types.getType(
+ parseInt(getHeaderValue(record.headers(), "type")),
+ parseInt(getHeaderValue(record.headers(), "version")));
+ adapter = handlers.get(type);
+ } catch (RuntimeException e) {
+ LOG.info("Ignoring unknown message");
+ return;
+ }
if (adapter != null) {
try {
handleRecord(adapter, record);
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
index cd7b48a..431f692 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
@@ -49,16 +49,17 @@
@ClassRule
public static KafkaRule kafka = new KafkaRule();
+ private MessagingProvider provider;
@Before
public void before() {
MockitoAnnotations.initMocks(this);
topicName = "MessagingTest" + UUID.randomUUID().toString();
+ this.provider = kafka.getProvider();
}
@Test
public void testSendReceive() throws Exception {
- MessagingProvider provider = kafka.getProvider();
HandlerAdapter<DiscoveryMessage> handler = HandlerAdapter.create(DiscoveryMessage.class, this::handle);
Closeable poller = provider.createPoller(topicName, Reset.earliest, handler);
DiscoveryMessage msg = DiscoveryMessage.newBuilder()
@@ -72,16 +73,14 @@
.build();
MessageSender<DiscoveryMessage> messageSender = provider.createSender();
- // After starting Kafka, sending and receiving should work
messageSender.send(topicName, msg);
- assertReceived();
+ assertReceived("Consumer started from earliest .. should see our message");
poller.close();
}
@Test
public void testAssign() throws Exception {
- MessagingProvider provider = kafka.getProvider();
DiscoveryMessage msg = DiscoveryMessage.newBuilder()
.setSubAgentName("sub1agent")
.setSubSlingId("subsling")
@@ -95,37 +94,34 @@
messageSender.send(topicName, msg);
HandlerAdapter<DiscoveryMessage> handler = HandlerAdapter.create(DiscoveryMessage.class, this::handle);
- long offset;
try (Closeable poller = provider.createPoller(topicName, Reset.earliest, handler)) {
- assertReceived();
- offset = lastInfo.getOffset();
+ assertReceived("Starting from earliest .. should see our message");
}
+ long offset = lastInfo.getOffset();
- // Starting from old offset .. should see our message
String assign = "0:" + offset;
try (Closeable poller = provider.createPoller(topicName, Reset.latest, assign, handler)) {
- assertReceived();
+ assertReceived("Starting from old offset .. should see our message");
assertThat(lastInfo.getOffset(), equalTo(offset));
}
- // Starting from invalid offset. Should see old message as we start from earliest
String invalid = "0:32532523453";
- try (Closeable poller = provider.createPoller(topicName, Reset.earliest, invalid, handler)) {
- assertReceived();
+ try (Closeable poller1 = provider.createPoller(topicName, Reset.latest, invalid, handler)) {
+ assertNotReceived("Should see old message as we start from earliest");
}
- // Starting from invalid offset. Should not see any message as we start from latest
- try (Closeable poller = provider.createPoller(topicName, Reset.latest, invalid, handler)) {
- assertNotReceived();
+ String invalid1 = "0:32532523453";
+ try (Closeable poller2 = provider.createPoller(topicName, Reset.earliest, invalid1, handler)) {
+ assertReceived("Should not see any message as we start from latest");
}
}
- private void assertReceived() throws InterruptedException {
- assertTrue(sem.tryAcquire(30, TimeUnit.SECONDS));
+ private void assertReceived(String message) throws InterruptedException {
+ assertTrue(message, sem.tryAcquire(30, TimeUnit.SECONDS));
}
- private void assertNotReceived() throws InterruptedException {
- assertFalse(sem.tryAcquire(2, TimeUnit.SECONDS));
+ private void assertNotReceived(String message) throws InterruptedException {
+ assertFalse(message, sem.tryAcquire(2, TimeUnit.SECONDS));
}
private void handle(MessageInfo info, DiscoveryMessage message) {