NIFI-9053 Adding support for TLS and basic authentication with Druid
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
This closes #5308.
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
index 948471e..6b777b3 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
@@ -42,6 +42,12 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-ssl-context-service-api</artifactId>
+ <version>1.15.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>io.druid</groupId>
<artifactId>tranquility-core_2.11</artifactId>
<scope>provided</scope>
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
index 4d69699..876290b 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
@@ -53,6 +54,7 @@
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
import com.metamx.common.Granularity;
import com.metamx.tranquility.beam.Beam;
@@ -318,6 +320,33 @@
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("druid-cs-ssl-context-service")
+ .displayName("SSL Context Service")
+ .description("Specifies the SSL Context Service to use for communicating with Druid.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+
+ public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder()
+ .name("druid-cs-basic-auth-username")
+ .displayName("Username")
+ .description("Username for authentication to Druid.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
+ .build();
+
+ public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder()
+ .name("druid-cs-basic-auth-password")
+ .displayName("Password")
+ .description("Password for authentication to Druid.")
+ .required(false)
+ .sensitive(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
+ .build();
+
private static final List<PropertyDescriptor> properties;
private volatile CuratorFramework curator;
@@ -347,6 +376,9 @@
props.add(MAX_BATCH_SIZE);
props.add(MAX_PENDING_BATCHES);
props.add(LINGER_MILLIS);
+ props.add(SSL_CONTEXT_SERVICE);
+ props.add(PROP_BASIC_AUTH_USERNAME);
+ props.add(PROP_BASIC_AUTH_PASSWORD);
properties = Collections.unmodifiableList(props);
}
@@ -361,6 +393,8 @@
Set<ValidationResult> results = new HashSet<>();
final String segmentGranularity = validationContext.getProperty(SEGMENT_GRANULARITY).getValue();
final String queryGranularity = validationContext.getProperty(QUERY_GRANULARITY).getValue();
+ final String basicAuthUsername = validationContext.getProperty(PROP_BASIC_AUTH_USERNAME).getValue();
+ final String basicAuthPassword = validationContext.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue();
// Verify that segment granularity is as least as large as query granularity
if (TIME_ORDINALS.indexOf(segmentGranularity) < TIME_ORDINALS.indexOf(queryGranularity)) {
@@ -368,6 +402,17 @@
"Segment Granularity must be at least as large as Query Granularity").build());
}
+ // Verify that username and password are both absent or both set
+ if (StringUtils.isNotBlank(basicAuthUsername) && StringUtils.isBlank(basicAuthPassword)) {
+ results.add(new ValidationResult.Builder().subject(PROP_BASIC_AUTH_PASSWORD.getDisplayName())
+ .explanation("it is required when '" + PROP_BASIC_AUTH_USERNAME.getDisplayName() + "' is set").build()
+ );
+ } else if (StringUtils.isBlank(basicAuthUsername) && StringUtils.isNotBlank(basicAuthPassword)) {
+ results.add(new ValidationResult.Builder().subject(PROP_BASIC_AUTH_USERNAME.getDisplayName())
+ .explanation("it is required when '" + PROP_BASIC_AUTH_PASSWORD.getDisplayName() + "' is set").build()
+ );
+ }
+
return results;
}
@@ -396,6 +441,9 @@
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final int maxPendingBatches = context.getProperty(MAX_PENDING_BATCHES).evaluateAttributeExpressions().asInteger();
final int lingerMillis = context.getProperty(LINGER_MILLIS).evaluateAttributeExpressions().asInteger();
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final String basicAuthUsername = context.getProperty(PROP_BASIC_AUTH_USERNAME).evaluateAttributeExpressions().getValue();
+ final String basicAuthPassword = context.getProperty(PROP_BASIC_AUTH_PASSWORD).evaluateAttributeExpressions().getValue();
transitUri = String.format(FIREHOSE_PATTERN, dataSource) + ";indexServicePath=" + indexService;
@@ -428,7 +476,8 @@
final TimestampSpec timestampSpec = new TimestampSpec(timestampField, "auto", null);
final Beam<Map<String, Object>> beam = buildBeam(dataSource, indexService, discoveryPath, clusterPartitions, clusterReplication,
- segmentGranularity, queryGranularity, windowPeriod, firehoseGracePeriod, indexRetryPeriod, dimensions, aggregator, timestamper, timestampSpec);
+ segmentGranularity, queryGranularity, windowPeriod, firehoseGracePeriod, indexRetryPeriod, dimensions, aggregator, timestamper, timestampSpec,
+ sslContextService, basicAuthUsername, basicAuthPassword);
tranquilizer = buildTranquilizer(maxBatchSize, maxPendingBatches, lingerMillis, beam);
@@ -446,13 +495,14 @@
Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication,
String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, DruidDimensions dimensions,
- List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
- return DruidBeams.builder(timestamper)
+ List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec,
+ SSLContextService sslContextService, String basicAuthUsername, String basicAuthPassword) {
+ DruidBeams.Builder<Map<String, Object>, Map<String, Object>> builder = DruidBeams.builder(timestamper)
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation.create(DruidEnvironment.create(indexService, FIREHOSE_PATTERN), dataSource))
.timestampSpec(timestampSpec)
- .rollup(DruidRollup.create(dimensions, aggregator, QueryGranularity.fromString(queryGranularity)))
+ .rollup(DruidRollup.create(dimensions, aggregator, QueryGranularity.fromString(queryGranularity), true))
.tuning(
ClusteredBeamTuning
.builder()
@@ -461,14 +511,42 @@
.partitions(clusterPartitions)
.replicants(clusterReplication)
.build()
- )
- .druidBeamConfig(
- DruidBeamConfig
- .builder()
- .indexRetryPeriod(new Period(indexRetryPeriod))
- .firehoseGracePeriod(new Period(firehoseGracePeriod))
- .build())
- .buildBeam();
+ );
+
+ if (sslContextService != null && sslContextService.isTrustStoreConfigured()) {
+ builder = builder
+ .tlsEnable(true)
+ .tlsProtocol(sslContextService.getSslAlgorithm())
+ .tlsTrustStorePassword(sslContextService.getTrustStorePassword())
+ .tlsTrustStorePath(sslContextService.getTrustStoreFile())
+ .tlsTrustStoreType(sslContextService.getTrustStoreType());
+ }
+
+ if (StringUtils.isNotBlank(basicAuthUsername) && StringUtils.isNotBlank(basicAuthPassword)) {
+ builder = builder
+ .druidBeamConfig(
+ DruidBeamConfig
+ .builder()
+ .indexRetryPeriod(new Period(indexRetryPeriod))
+ .firehoseGracePeriod(new Period(firehoseGracePeriod))
+ .basicAuthUser(basicAuthUsername)
+ .basicAuthPass(basicAuthPassword)
+ .build()
+ )
+ .basicAuthUser(basicAuthUsername)
+ .basicAuthPass(basicAuthPassword);
+ } else {
+ builder = builder
+ .druidBeamConfig(
+ DruidBeamConfig
+ .builder()
+ .indexRetryPeriod(new Period(indexRetryPeriod))
+ .firehoseGracePeriod(new Period(firehoseGracePeriod))
+ .build()
+ );
+ }
+
+ return builder.buildBeam();
}
@OnDisabled
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml
index ae51605..66f1c5e 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml
@@ -71,5 +71,11 @@
<version>1.15.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-ssl-context-service-api</artifactId>
+ <version>1.15.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
index f41f164..8cab325 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
@@ -31,6 +31,7 @@
import io.druid.data.input.impl.TimestampSpec;
import io.druid.query.aggregation.AggregatorFactory;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.nifi.ssl.SSLContextService;
import scala.Function1;
import scala.Option;
import scala.runtime.BoxedUnit;
@@ -139,7 +140,8 @@
@Override
Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication,
String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, DruidDimensions dimensions,
- List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
+ List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec,
+ SSLContextService sslContextService, String basicAuthUsername, String basicAuthPassword) {
return mock(Beam.class);
}
diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
index 49db42a..912e6d3 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
@@ -27,7 +27,7 @@
<properties>
<druid.version>0.9.1</druid.version>
- <tranquility.version>0.8.2</tranquility.version>
+ <tranquility.version>0.8.3</tranquility.version>
</properties>
<dependencyManagement>
@@ -64,6 +64,12 @@
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
+ <!-- Override jackson 2.4.5 from tranquility -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-scala_2.11</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
<!-- Override snakeyaml:1.11 from druid -->
<dependency>
<groupId>org.yaml</groupId>