NIFI-9053 Adding support for TLS and basic authentication with Druid

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5308.
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
index 948471e..6b777b3 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
@@ -42,6 +42,12 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>io.druid</groupId>
             <artifactId>tranquility-core_2.11</artifactId>
             <scope>provided</scope>
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
index 4d69699..876290b 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.collect.Lists;
@@ -53,6 +54,7 @@
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
 
 import com.metamx.common.Granularity;
 import com.metamx.tranquility.beam.Beam;
@@ -318,6 +320,33 @@
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("druid-cs-ssl-context-service")
+            .displayName("SSL Context Service")
+            .description("Specifies the SSL Context Service to use for communicating with Druid.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder()
+            .name("druid-cs-basic-auth-username")
+            .displayName("Username")
+            .description("Username for authentication to Druid.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
+            .build();
+
+    public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder()
+            .name("druid-cs-basic-auth-password")
+            .displayName("Password")
+            .description("Password for authentication to Druid.")
+            .required(false)
+            .sensitive(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
+            .build();
+
     private static final List<PropertyDescriptor> properties;
 
     private volatile CuratorFramework curator;
@@ -347,6 +376,9 @@
         props.add(MAX_BATCH_SIZE);
         props.add(MAX_PENDING_BATCHES);
         props.add(LINGER_MILLIS);
+        props.add(SSL_CONTEXT_SERVICE);
+        props.add(PROP_BASIC_AUTH_USERNAME);
+        props.add(PROP_BASIC_AUTH_PASSWORD);
 
         properties = Collections.unmodifiableList(props);
     }
@@ -361,6 +393,8 @@
         Set<ValidationResult> results = new HashSet<>();
         final String segmentGranularity = validationContext.getProperty(SEGMENT_GRANULARITY).getValue();
         final String queryGranularity = validationContext.getProperty(QUERY_GRANULARITY).getValue();
+        final String basicAuthUsername = validationContext.getProperty(PROP_BASIC_AUTH_USERNAME).getValue();
+        final String basicAuthPassword = validationContext.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue();
 
         // Verify that segment granularity is as least as large as query granularity
         if (TIME_ORDINALS.indexOf(segmentGranularity) < TIME_ORDINALS.indexOf(queryGranularity)) {
@@ -368,6 +402,17 @@
                     "Segment Granularity must be at least as large as Query Granularity").build());
         }
 
+        // Verify that username and password are both absent or both set
+        if (StringUtils.isNotBlank(basicAuthUsername) && StringUtils.isBlank(basicAuthPassword)) {
+            results.add(new ValidationResult.Builder().subject(PROP_BASIC_AUTH_PASSWORD.getDisplayName())
+                    .explanation("it is required when '" + PROP_BASIC_AUTH_USERNAME.getDisplayName() + "' is set").build()
+            );
+        } else if (StringUtils.isBlank(basicAuthUsername) && StringUtils.isNotBlank(basicAuthPassword)) {
+            results.add(new ValidationResult.Builder().subject(PROP_BASIC_AUTH_USERNAME.getDisplayName())
+                    .explanation("it is required when '" + PROP_BASIC_AUTH_PASSWORD.getDisplayName() + "' is set").build()
+            );
+        }
+
         return results;
     }
 
@@ -396,6 +441,9 @@
         final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         final int maxPendingBatches = context.getProperty(MAX_PENDING_BATCHES).evaluateAttributeExpressions().asInteger();
         final int lingerMillis = context.getProperty(LINGER_MILLIS).evaluateAttributeExpressions().asInteger();
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final String basicAuthUsername = context.getProperty(PROP_BASIC_AUTH_USERNAME).evaluateAttributeExpressions().getValue();
+        final String basicAuthPassword = context.getProperty(PROP_BASIC_AUTH_PASSWORD).evaluateAttributeExpressions().getValue();
 
         transitUri = String.format(FIREHOSE_PATTERN, dataSource) + ";indexServicePath=" + indexService;
 
@@ -428,7 +476,8 @@
         final TimestampSpec timestampSpec = new TimestampSpec(timestampField, "auto", null);
 
         final Beam<Map<String, Object>> beam = buildBeam(dataSource, indexService, discoveryPath, clusterPartitions, clusterReplication,
-                segmentGranularity, queryGranularity, windowPeriod, firehoseGracePeriod, indexRetryPeriod, dimensions, aggregator, timestamper, timestampSpec);
+                segmentGranularity, queryGranularity, windowPeriod, firehoseGracePeriod, indexRetryPeriod, dimensions, aggregator, timestamper, timestampSpec,
+                sslContextService, basicAuthUsername, basicAuthPassword);
 
         tranquilizer = buildTranquilizer(maxBatchSize, maxPendingBatches, lingerMillis, beam);
 
@@ -446,13 +495,14 @@
 
     Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication,
                                         String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, DruidDimensions dimensions,
-                                        List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
-        return DruidBeams.builder(timestamper)
+                                        List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec,
+                                        SSLContextService sslContextService, String basicAuthUsername, String basicAuthPassword) {
+        DruidBeams.Builder<Map<String, Object>, Map<String, Object>> builder = DruidBeams.builder(timestamper)
                 .curator(curator)
                 .discoveryPath(discoveryPath)
                 .location(DruidLocation.create(DruidEnvironment.create(indexService, FIREHOSE_PATTERN), dataSource))
                 .timestampSpec(timestampSpec)
-                .rollup(DruidRollup.create(dimensions, aggregator, QueryGranularity.fromString(queryGranularity)))
+                .rollup(DruidRollup.create(dimensions, aggregator, QueryGranularity.fromString(queryGranularity), true))
                 .tuning(
                         ClusteredBeamTuning
                                 .builder()
@@ -461,14 +511,42 @@
                                 .partitions(clusterPartitions)
                                 .replicants(clusterReplication)
                                 .build()
-                )
-                .druidBeamConfig(
-                        DruidBeamConfig
-                                .builder()
-                                .indexRetryPeriod(new Period(indexRetryPeriod))
-                                .firehoseGracePeriod(new Period(firehoseGracePeriod))
-                                .build())
-                .buildBeam();
+                );
+
+        if (sslContextService != null && sslContextService.isTrustStoreConfigured()) {
+            builder = builder
+                    .tlsEnable(true)
+                    .tlsProtocol(sslContextService.getSslAlgorithm())
+                    .tlsTrustStorePassword(sslContextService.getTrustStorePassword())
+                    .tlsTrustStorePath(sslContextService.getTrustStoreFile())
+                    .tlsTrustStoreType(sslContextService.getTrustStoreType());
+        }
+
+        if (StringUtils.isNotBlank(basicAuthUsername) && StringUtils.isNotBlank(basicAuthPassword)) {
+            builder = builder
+                    .druidBeamConfig(
+                            DruidBeamConfig
+                                    .builder()
+                                    .indexRetryPeriod(new Period(indexRetryPeriod))
+                                    .firehoseGracePeriod(new Period(firehoseGracePeriod))
+                                    .basicAuthUser(basicAuthUsername)
+                                    .basicAuthPass(basicAuthPassword)
+                                    .build()
+                    )
+                    .basicAuthUser(basicAuthUsername)
+                    .basicAuthPass(basicAuthPassword);
+        } else {
+            builder = builder
+                    .druidBeamConfig(
+                            DruidBeamConfig
+                                    .builder()
+                                    .indexRetryPeriod(new Period(indexRetryPeriod))
+                                    .firehoseGracePeriod(new Period(firehoseGracePeriod))
+                                    .build()
+                    );
+        }
+
+        return builder.buildBeam();
     }
 
     @OnDisabled
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml
index ae51605..66f1c5e 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml
@@ -71,5 +71,11 @@
             <version>1.15.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
index f41f164..8cab325 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
@@ -31,6 +31,7 @@
 import io.druid.data.input.impl.TimestampSpec;
 import io.druid.query.aggregation.AggregatorFactory;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.nifi.ssl.SSLContextService;
 import scala.Function1;
 import scala.Option;
 import scala.runtime.BoxedUnit;
@@ -139,7 +140,8 @@
     @Override
     Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication,
                                         String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, DruidDimensions dimensions,
-                                        List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
+                                        List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec,
+                                        SSLContextService sslContextService, String basicAuthUsername, String basicAuthPassword) {
         return mock(Beam.class);
     }
 
diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
index 49db42a..912e6d3 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
@@ -27,7 +27,7 @@
 
     <properties>
         <druid.version>0.9.1</druid.version>
-        <tranquility.version>0.8.2</tranquility.version>
+        <tranquility.version>0.8.3</tranquility.version>
     </properties>
 
     <dependencyManagement>
@@ -64,6 +64,12 @@
                 <artifactId>jackson-databind</artifactId>
                 <version>${jackson.version}</version>
             </dependency>
+            <!-- Override jackson 2.4.5 from tranquility -->
+            <dependency>
+                <groupId>com.fasterxml.jackson.module</groupId>
+                <artifactId>jackson-module-scala_2.11</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
             <!-- Override snakeyaml:1.11 from druid -->
             <dependency>
                 <groupId>org.yaml</groupId>