SLING-8414 - Add support for Azure Eventhubs as backend

diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index b4b78b9..75c3e99 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -22,6 +22,7 @@
 import static java.util.Collections.singleton;
 import static java.util.Collections.unmodifiableMap;
 import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
@@ -33,6 +34,8 @@
 import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -89,11 +92,20 @@
 
     private int defaultApiTimeout;
 
+    private String securityProtocol;
+
+    private String saslMechanism;
+
+    private String saslJaasConfig;
+
     @Activate
     public void activate(KafkaEndpoint kafkaEndpoint) {
         kafkaBootstrapServers = requireNonNull(kafkaEndpoint.kafkaBootstrapServers());
         requestTimeout = kafkaEndpoint.kafkaRequestTimeout();
         defaultApiTimeout = kafkaEndpoint.kafkaDefaultApiTimeout();
+        securityProtocol = kafkaEndpoint.securityProtocol();
+        saslMechanism = kafkaEndpoint.saslMechanism();
+        saslJaasConfig = kafkaEndpoint.saslJaasConfig();
     }
     
     @Deactivate
@@ -187,7 +199,13 @@
 
     protected <T> KafkaConsumer<String, T> createConsumer(Class<? extends Deserializer<?>> deserializer, Reset reset) {
         String groupId = UUID.randomUUID().toString();
-        return new KafkaConsumer<>(consumerConfig(deserializer, groupId, reset));
+        ClassLoader oldClassloader = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(KafkaConsumer.class.getClassLoader());
+        try {
+            return new KafkaConsumer<>(consumerConfig(deserializer, groupId, reset));
+        } finally {
+            Thread.currentThread().setContextClassLoader(oldClassloader);
+        }
     }
 
     private void closeQuietly(final Closeable closeable) {
@@ -203,6 +221,14 @@
     @Nonnull
     private synchronized KafkaProducer<String, byte[]> buildKafkaProducer() {
         if (rawProducer == null) {
+            ClassLoader oldClassloader = Thread.currentThread().getContextClassLoader();
+            Thread.currentThread().setContextClassLoader(KafkaProducer.class.getClassLoader());
+            try {
+                rawProducer = new KafkaProducer<>(producerConfig(ByteArraySerializer.class));
+            } finally {
+                Thread.currentThread().setContextClassLoader(oldClassloader);
+            }
+
             rawProducer = new KafkaProducer<>(producerConfig(ByteArraySerializer.class));
         }
         return rawProducer;
@@ -217,8 +243,7 @@
     }
 
     private Map<String, Object> consumerConfig(Object deserializer, String consumerGroupId, Reset reset) {
-        Map<String, Object> config = new HashMap<>();
-        config.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
+        Map<String, Object> config = commonConfig();
         config.put(GROUP_ID_CONFIG, consumerGroupId);
         config.put(ENABLE_AUTO_COMMIT_CONFIG, false);
         config.put(DEFAULT_API_TIMEOUT_MS_CONFIG, defaultApiTimeout);
@@ -229,15 +254,25 @@
     }
 
     private Map<String, Object> producerConfig(Object serializer) {
-        Map<String, Object> config = new HashMap<>();
+        Map<String, Object> config = commonConfig();
+        config.put(REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
         config.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         config.put(VALUE_SERIALIZER_CLASS_CONFIG, serializer);
-        config.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
-        config.put(REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
         config.put(ACKS_CONFIG, "all");
         return unmodifiableMap(config);
     }
 
+    private Map<String, Object> commonConfig() {
+        Map<String, Object> config = new HashMap<>();
+        config.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
+        config.put(SASL_MECHANISM, saslMechanism);
+        config.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
+        if (!saslJaasConfig.isEmpty()) {
+            config.put(SASL_JAAS_CONFIG, saslJaasConfig);
+        }
+        return config;
+    }
+
     private long offset(String assign) {
         String[] chunks = assign.split(":");
         if (chunks.length != 2) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java
index 3e5428b..7301005 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java
@@ -36,4 +36,17 @@
     @AttributeDefinition(name = "Kafka Default API Timeout",
             description = "Kafka Default API Timeout in ms.")
     int kafkaDefaultApiTimeout() default 60000;
+    
+    @AttributeDefinition(name = "Security protocol",
+            description = "e.g. SASL_SSL")
+    String securityProtocol() default "PLAINTEXT";
+    
+    @AttributeDefinition(name = "Sasl mechanism",
+            description = "e.g. PLAIN")
+    String saslMechanism() default "GSSAPI";
+    
+    @AttributeDefinition(name = "Sasl jaas config",
+            description = "")
+    String saslJaasConfig() default "";
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 687d253..8dfec8f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -97,10 +97,17 @@
     }
 
     private void handleRecord(ConsumerRecord<String, byte[]> record) {
-        Class<?> type = Types.getType(
-                parseInt(getHeaderValue(record.headers(), "type")),
-                parseInt(getHeaderValue(record.headers(), "version")));
-        HandlerAdapter<?> adapter = handlers.get(type);
+        Class<?> type;
+        HandlerAdapter<?> adapter;
+        try {
+            type = Types.getType(
+                    parseInt(getHeaderValue(record.headers(), "type")),
+                    parseInt(getHeaderValue(record.headers(), "version")));
+            adapter = handlers.get(type);
+        } catch (RuntimeException e) {
+            LOG.info("Ignoring unknown message");
+            return;
+        }
         if (adapter != null) {
             try {
                 handleRecord(adapter, record);
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
index cd7b48a..431f692 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
@@ -49,16 +49,17 @@
     
     @ClassRule
     public static KafkaRule kafka = new KafkaRule();
+    private MessagingProvider provider;
     
     @Before
     public void before() {
         MockitoAnnotations.initMocks(this);
         topicName = "MessagingTest" + UUID.randomUUID().toString();
+        this.provider = kafka.getProvider();
     }
     
     @Test
     public void testSendReceive() throws Exception {
-        MessagingProvider provider = kafka.getProvider();
         HandlerAdapter<DiscoveryMessage> handler = HandlerAdapter.create(DiscoveryMessage.class, this::handle);
         Closeable poller = provider.createPoller(topicName, Reset.earliest, handler);
         DiscoveryMessage msg = DiscoveryMessage.newBuilder()
@@ -72,16 +73,14 @@
                 .build();
         MessageSender<DiscoveryMessage> messageSender = provider.createSender();
         
-        // After starting Kafka, sending and receiving should work
         messageSender.send(topicName, msg);
-        assertReceived();
+        assertReceived("Consumer started from earliest .. should see our message");
 
         poller.close();
     }
     
     @Test
     public void testAssign() throws Exception {
-        MessagingProvider provider = kafka.getProvider();
         DiscoveryMessage msg = DiscoveryMessage.newBuilder()
                 .setSubAgentName("sub1agent")
                 .setSubSlingId("subsling")
@@ -95,37 +94,34 @@
         messageSender.send(topicName, msg);
         
         HandlerAdapter<DiscoveryMessage> handler = HandlerAdapter.create(DiscoveryMessage.class, this::handle);
-        long offset;
         try (Closeable poller = provider.createPoller(topicName, Reset.earliest, handler)) {
-            assertReceived();
-            offset = lastInfo.getOffset();
+            assertReceived("Starting from earliest .. should see our message");
         }
+        long offset = lastInfo.getOffset();
         
-        // Starting from old offset .. should see our message
         String assign = "0:" + offset;
         try (Closeable poller = provider.createPoller(topicName, Reset.latest, assign, handler)) {
-            assertReceived();
+            assertReceived("Starting from old offset .. should see our message");
             assertThat(lastInfo.getOffset(), equalTo(offset));
         }
         
-        // Starting from invalid offset. Should see old message as we start from earliest
         String invalid = "0:32532523453";
-        try (Closeable poller = provider.createPoller(topicName, Reset.earliest, invalid, handler)) {
-            assertReceived();
+        try (Closeable poller1 = provider.createPoller(topicName, Reset.latest, invalid, handler)) {
+            assertNotReceived("Should see old message as we start from earliest");
         }
         
-        // Starting from invalid offset. Should not see any message as we start from latest
-        try (Closeable poller = provider.createPoller(topicName, Reset.latest, invalid, handler)) {
-            assertNotReceived();
+        String invalid1 = "0:32532523453";
+        try (Closeable poller2 = provider.createPoller(topicName, Reset.earliest, invalid1, handler)) {
+            assertReceived("Should not see any message as we start from latest");
         }
     }
 
-    private void assertReceived() throws InterruptedException {
-        assertTrue(sem.tryAcquire(30, TimeUnit.SECONDS));
+    private void assertReceived(String message) throws InterruptedException {
+        assertTrue(message, sem.tryAcquire(30, TimeUnit.SECONDS));
     }
     
-    private void assertNotReceived() throws InterruptedException {
-        assertFalse(sem.tryAcquire(2, TimeUnit.SECONDS));
+    private void assertNotReceived(String message) throws InterruptedException {
+        assertFalse(message, sem.tryAcquire(2, TimeUnit.SECONDS));
     }
 
     private void handle(MessageInfo info, DiscoveryMessage message) {