NIFI-8501 Added Azure blob client side encryption
This closes #5078
Signed-off-by: Joey Frazee <jfrazee@apache.org>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index b79a632..675be64 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -22,6 +22,7 @@
<properties>
<azure-eventhubs.version>3.2.1</azure-eventhubs.version>
<azure-eventhubs-eph.version>3.2.1</azure-eventhubs-eph.version>
+ <azure-keyvault.version>1.2.4</azure-keyvault.version>
</properties>
<dependencies>
<dependency>
@@ -85,6 +86,11 @@
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-keyvault</artifactId>
+ <version>${azure-keyvault.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>${azure-eventhubs-eph.version}</version>
<exclusions>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
index 95070c7..9e2b855 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
@@ -16,13 +16,21 @@
*/
package org.apache.nifi.processors.azure;
+import com.microsoft.azure.keyvault.cryptography.SymmetricKey;
+import com.microsoft.azure.storage.blob.BlobEncryptionPolicy;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.Arrays;
@@ -85,4 +93,24 @@
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
+
+ protected BlobRequestOptions createBlobRequestOptions(ProcessContext context) throws DecoderException {
+ final String cseKeyTypeValue = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE).getValue();
+ final AzureBlobClientSideEncryptionMethod cseKeyType = AzureBlobClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
+
+ final String cseKeyId = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID).getValue();
+
+ final String cseSymmetricKeyHex = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX).getValue();
+
+ BlobRequestOptions blobRequestOptions = new BlobRequestOptions();
+
+ if (cseKeyType == AzureBlobClientSideEncryptionMethod.SYMMETRIC) {
+ byte[] keyBytes = Hex.decodeHex(cseSymmetricKeyHex.toCharArray());
+ SymmetricKey key = new SymmetricKey(cseKeyId, keyBytes);
+ BlobEncryptionPolicy policy = new BlobEncryptionPolicy(key, null);
+ blobRequestOptions.setEncryptionPolicy(policy);
+ }
+
+ return blobRequestOptions;
+ }
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
index 11996f3..ba003e8 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
@@ -23,10 +23,12 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.microsoft.azure.storage.OperationContext;
+import org.apache.commons.codec.DecoderException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -35,6 +37,8 @@
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
@@ -43,12 +47,14 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile")
@@ -80,10 +86,20 @@
.build();
@Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+ results.addAll(AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext));
+ return results;
+ }
+
+ @Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- List<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
+ List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(RANGE_START);
properties.add(RANGE_LENGTH);
+ properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE);
+ properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
+ properties.add(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
return properties;
}
@@ -112,11 +128,13 @@
final Map<String, String> attributes = new HashMap<>();
final CloudBlob blob = container.getBlockBlobReference(blobPath);
+ BlobRequestOptions blobRequestOptions = createBlobRequestOptions(context);
+
// TODO - we may be able do fancier things with ranges and
// distribution of download over threads, investigate
flowFile = session.write(flowFile, os -> {
try {
- blob.downloadRange(rangeStart, rangeLength, os, null, null, operationContext);
+ blob.downloadRange(rangeStart, rangeLength, os, null, blobRequestOptions, operationContext);
} catch (StorageException e) {
storedException.set(e);
throw new IOException(e);
@@ -133,7 +151,7 @@
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
- } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) {
+ } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException | DecoderException e) {
if (e instanceof ProcessException && storedException.get() == null) {
throw (ProcessException) e;
} else {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
index 859f2be..c0e473e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -26,11 +26,13 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.storage.OperationContext;
+import org.apache.commons.codec.DecoderException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -39,6 +41,8 @@
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
@@ -46,6 +50,7 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import com.microsoft.azure.storage.StorageException;
@@ -53,6 +58,7 @@
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class, DeleteAzureBlobStorage.class })
@@ -89,11 +95,21 @@
.build();
@Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+ results.addAll(AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext));
+ return results;
+ }
+
+ @Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.remove(BLOB);
properties.add(BLOB_NAME);
properties.add(CREATE_CONTAINER);
+ properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE);
+ properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
+ properties.add(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
return properties;
}
@@ -124,6 +140,8 @@
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
+ BlobRequestOptions blobRequestOptions = createBlobRequestOptions(context);
+
final Map<String, String> attributes = new HashMap<>();
long length = flowFile.getSize();
session.read(flowFile, rawIn -> {
@@ -142,7 +160,7 @@
}
try {
- uploadBlob(blob, operationContext, in);
+ uploadBlob(blob, operationContext, blobRequestOptions, in);
BlobProperties properties = blob.getProperties();
attributes.put("azure.container", containerName);
attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
@@ -163,7 +181,7 @@
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
- } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) {
+ } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException | DecoderException e) {
if (e instanceof ProcessException && storedException.get() == null) {
throw (ProcessException) e;
} else {
@@ -177,8 +195,8 @@
}
@VisibleForTesting
- void uploadBlob(CloudBlob blob, OperationContext operationContext, InputStream in) throws StorageException, IOException {
- blob.upload(in, -1, null, null, operationContext);
+ void uploadBlob(CloudBlob blob, OperationContext operationContext, BlobRequestOptions blobRequestOptions, InputStream in) throws StorageException, IOException {
+ blob.upload(in, -1, null, blobRequestOptions, operationContext);
}
// Used to help force Azure Blob SDK to write in blocks
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionMethod.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionMethod.java
new file mode 100644
index 0000000..f6e7419
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionMethod.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+/**
+ * Enumeration capturing essential information about the various client-side
+ * encryption methods supported by Azure
+ */
+public enum AzureBlobClientSideEncryptionMethod {
+
+ NONE("None", "The blobs sent to Azure are not encrypted."),
+ SYMMETRIC("Symmetric", "The blobs sent to Azure are encrypted using a symmetric algorithm.");
+
+ private final String cseName;
+ private final String description;
+
+ AzureBlobClientSideEncryptionMethod(String cseName, String description) {
+ this.cseName = cseName;
+ this.description = description;
+ }
+
+ public String getCseName() {
+ return cseName;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java
new file mode 100644
index 0000000..6e7a2ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.microsoft.azure.keyvault.cryptography.SymmetricKey;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+public class AzureBlobClientSideEncryptionUtils {
+
+ private static final String DEFAULT_KEY_ID = "nifi";
+
+ public static final PropertyDescriptor CSE_KEY_TYPE = new PropertyDescriptor.Builder()
+ .name("cse-key-type")
+ .displayName("Client-Side Encryption Key Type")
+ .required(true)
+ .allowableValues(buildCseEncryptionMethodAllowableValues())
+ .defaultValue(AzureBlobClientSideEncryptionMethod.NONE.name())
+ .description("Specifies the key type to use for client-side encryption.")
+ .build();
+
+ public static final PropertyDescriptor CSE_KEY_ID = new PropertyDescriptor.Builder()
+ .name("cse-key-id")
+ .displayName("Client-Side Encryption Key ID")
+ .description("Specifies the ID of the key to use for client-side encryption.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name())
+ .build();
+
+ public static final PropertyDescriptor CSE_SYMMETRIC_KEY_HEX = new PropertyDescriptor.Builder()
+ .name("cse-symmetric-key-hex")
+ .displayName("Symmetric Key")
+ .description("When using symmetric client-side encryption, this is the raw key, encoded in hexadecimal")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name())
+ .sensitive(true)
+ .build();
+
+ private static AllowableValue[] buildCseEncryptionMethodAllowableValues() {
+ return Arrays.stream(AzureBlobClientSideEncryptionMethod.values())
+ .map(v -> new AllowableValue(v.name(), v.name(), v.getDescription()))
+ .toArray(AllowableValue[]::new);
+ }
+
+ public static Collection<ValidationResult> validateClientSideEncryptionProperties(ValidationContext validationContext) {
+ final List<ValidationResult> validationResults = new ArrayList<>();
+
+ final String cseKeyTypeValue = validationContext.getProperty(CSE_KEY_TYPE).getValue();
+ final AzureBlobClientSideEncryptionMethod cseKeyType = AzureBlobClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
+
+ final String cseKeyId = validationContext.getProperty(CSE_KEY_ID).getValue();
+
+ final String cseSymmetricKeyHex = validationContext.getProperty(CSE_SYMMETRIC_KEY_HEX).getValue();
+
+ if (cseKeyType != AzureBlobClientSideEncryptionMethod.NONE && StringUtils.isBlank(cseKeyId)) {
+ validationResults.add(new ValidationResult.Builder().subject(CSE_KEY_ID.getDisplayName())
+ .explanation("a key ID must be set when client-side encryption is enabled.").build());
+ }
+
+ if (cseKeyType == AzureBlobClientSideEncryptionMethod.SYMMETRIC) {
+ validationResults.addAll(validateSymmetricKey(cseSymmetricKeyHex));
+ }
+
+ return validationResults;
+ }
+
+ private static List<ValidationResult> validateSymmetricKey(String keyHex) {
+ final List<ValidationResult> validationResults = new ArrayList<>();
+ if (StringUtils.isBlank(keyHex)) {
+ validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
+ .explanation("a symmetric key must not be set when client-side encryption is enabled with symmetric encryption.").build());
+ } else {
+ byte[] keyBytes;
+ try {
+ keyBytes = Hex.decodeHex(keyHex.toCharArray());
+ new SymmetricKey(DEFAULT_KEY_ID, keyBytes);
+ } catch (DecoderException e) {
+ validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
+ .explanation("the symmetric key must be a valid hexadecimal string.").build());
+ } catch (IllegalArgumentException e) {
+ validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
+ .explanation(e.getMessage()).build());
+ }
+ }
+
+ return validationResults;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java
new file mode 100644
index 0000000..bdf7687
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ComparisonFailure;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.fail;
+
+public class ITAzureBlobStorageE2E {
+
+ private static final Properties CONFIG;
+
+ private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
+
+ static {
+ CONFIG = new Properties();
+ try {
+ final FileInputStream fis = new FileInputStream(CREDENTIALS_FILE);
+ try {
+ CONFIG.load(fis);
+ } catch (IOException e) {
+ fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+ } finally {
+ FileUtils.closeQuietly(fis);
+ }
+ } catch (FileNotFoundException e) {
+ fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+ }
+ }
+
+ protected static String getAccountName() {
+ return CONFIG.getProperty("accountName");
+ }
+
+ protected static String getAccountKey() {
+ return CONFIG.getProperty("accountKey");
+ }
+
+ protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container";
+ protected static final String TEST_BLOB_NAME = "nifi-test-blob";
+ protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
+
+ private static final String KEY_ID_VALUE = "key:id";
+ private static final String KEY_64B_VALUE = "1234567890ABCDEF";
+ private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
+ private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
+ private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
+ private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
+ private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
+
+ protected TestRunner putRunner;
+ protected TestRunner listRunner;
+ protected TestRunner fetchRunner;
+
+ protected CloudBlobContainer container;
+
+ @Before
+ public void setupRunners() throws Exception {
+ putRunner = TestRunners.newTestRunner(new PutAzureBlobStorage());
+ listRunner = TestRunners.newTestRunner(new ListAzureBlobStorage());
+ fetchRunner = TestRunners.newTestRunner(new FetchAzureBlobStorage());
+
+ String containerName = String.format("%s-%s", TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
+
+ StorageCredentials storageCredentials = new StorageCredentialsAccountAndKey(getAccountName(), getAccountKey());
+ CloudStorageAccount storageAccount = new CloudStorageAccount(storageCredentials, true);
+
+ CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+ container = blobClient.getContainerReference(containerName);
+ container.createIfNotExists();
+
+ setRunnerProperties(putRunner, containerName);
+ setRunnerProperties(listRunner, containerName);
+ setRunnerProperties(fetchRunner, containerName);
+ }
+
+ private void setRunnerProperties(TestRunner runner, String containerName) {
+ runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, getAccountName());
+ runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
+ runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
+ }
+
+ @After
+ public void tearDownAzureContainer() throws Exception {
+ container.deleteIfExists();
+ }
+
+ @Test
+ public void AzureBlobStorageE2ENoCSE() throws Exception {
+ testE2E(AzureBlobClientSideEncryptionMethod.NONE.name(),
+ null,
+ null,
+ AzureBlobClientSideEncryptionMethod.NONE.name(),
+ null,
+ null
+ );
+ }
+
+ @Test
+ public void AzureBlobStorageE2E128BCSE() throws Exception {
+ testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+ KEY_ID_VALUE,
+ KEY_128B_VALUE,
+ AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+ KEY_ID_VALUE,
+ KEY_128B_VALUE
+ );
+ }
+
+ @Test
+ public void AzureBlobStorageE2E192BCSE() throws Exception {
+ testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+ KEY_ID_VALUE,
+ KEY_192B_VALUE,
+ AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+ KEY_ID_VALUE,
+ KEY_192B_VALUE
+ );
+ }
+
+ @Test
+ public void AzureBlobStorageE2E256BCSE() throws Exception {
+ testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+ KEY_ID_VALUE,
+ KEY_256B_VALUE,
+ AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+ KEY_ID_VALUE,
+ KEY_256B_VALUE
+ );
+ }
+
+ @Test
+ public void AzureBlobStorageE2E384BCSE() throws Exception {
+ testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+ KEY_ID_VALUE,
+ KEY_384B_VALUE,
+ AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+ KEY_ID_VALUE,
+ KEY_384B_VALUE
+ );
+ }
+
+ @Test
+ public void AzureBlobStorageE2E512BCSE() throws Exception {
+ testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+ KEY_ID_VALUE,
+ KEY_512B_VALUE,
+ AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+ KEY_ID_VALUE,
+ KEY_512B_VALUE
+ );
+ }
+
+ @Test(expected = ComparisonFailure.class)
+ public void AzureBlobStorageE2E128BCSENoDecryption() throws Exception {
+ testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+ KEY_ID_VALUE,
+ KEY_128B_VALUE,
+ AzureBlobClientSideEncryptionMethod.NONE.name(),
+ KEY_ID_VALUE,
+ KEY_128B_VALUE
+ );
+ }
+
+ private void testE2E(String encryptionKeyType, String encryptionKeyId, String encryptionKeyHex, String decryptionKeyType, String decryptionKeyId, String decryptionKeyHex) throws Exception {
+ putRunner.setProperty(PutAzureBlobStorage.BLOB, TEST_BLOB_NAME);
+ putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, encryptionKeyType);
+ if (encryptionKeyId == null || encryptionKeyId.isEmpty() || encryptionKeyId.trim().isEmpty()) {
+ putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
+ } else {
+ putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, encryptionKeyId);
+ }
+ if (encryptionKeyHex == null || encryptionKeyHex.isEmpty() || encryptionKeyHex.trim().isEmpty()) {
+ putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
+ } else {
+ putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, encryptionKeyHex);
+ }
+
+ putRunner.assertValid();
+ putRunner.enqueue(TEST_FILE_CONTENT.getBytes());
+ putRunner.run();
+ putRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
+
+ Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2);
+
+ listRunner.assertValid();
+ listRunner.run();
+ listRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
+
+ MockFlowFile entry = listRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0);
+ entry.assertAttributeEquals("mime.type", "application/octet-stream");
+
+ fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, decryptionKeyType);
+ if (decryptionKeyId == null || decryptionKeyId.isEmpty() || decryptionKeyId.trim().isEmpty()) {
+ fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
+ } else {
+ fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, decryptionKeyId);
+ }
+ if (decryptionKeyHex == null || decryptionKeyHex.isEmpty() || decryptionKeyHex.trim().isEmpty()) {
+ fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
+ } else {
+ fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, decryptionKeyHex);
+ }
+ fetchRunner.assertValid();
+ fetchRunner.enqueue(entry);
+ fetchRunner.run();
+ fetchRunner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
+ MockFlowFile fetchedEntry = fetchRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0);
+ fetchedEntry.assertContentEquals(TEST_FILE_CONTENT);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
index 6d4fce4..49365b5 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
@@ -20,7 +20,6 @@
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;
-
import java.util.concurrent.TimeUnit;
public class ITListAzureBlobStorage extends AbstractAzureBlobStorageIT {
@@ -34,11 +33,11 @@
public void setUp() throws Exception {
uploadTestBlob();
- Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS));
+ Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2);
}
@Test
- public void testListBlobs() {
+ public void testListBlobs() throws Exception {
runner.assertValid();
runner.run(1);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
index 142b592..85644d4 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
@@ -18,6 +18,8 @@
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
@@ -29,6 +31,16 @@
public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT {
+ public static final String TEST_FILE_CONTENT = "0123456789";
+ private static final String KEY_ID_VALUE = "key:id";
+ private static final String KEY_64B_VALUE = "1234567890ABCDEF";
+ private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
+ private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
+ private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
+ private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
+ private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
+
+
@Override
protected Class<? extends Processor> getProcessorClass() {
return PutAzureBlobStorage.class;
@@ -42,7 +54,75 @@
@Test
public void testPutBlob() throws Exception {
runner.assertValid();
- runner.enqueue("0123456789".getBytes());
+ runner.enqueue(TEST_FILE_CONTENT.getBytes());
+ runner.run();
+
+ assertResult();
+ }
+
+ @Test
+ public void testPutBlob64BSymmetricCSE() {
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_64B_VALUE);
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testPutBlob128BSymmetricCSE() throws Exception {
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_128B_VALUE);
+ runner.assertValid();
+ runner.enqueue(TEST_FILE_CONTENT.getBytes());
+ runner.run();
+
+ assertResult();
+ }
+
+ @Test
+ public void testPutBlob192BSymmetricCSE() throws Exception {
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_192B_VALUE);
+ runner.assertValid();
+ runner.enqueue(TEST_FILE_CONTENT.getBytes());
+ runner.run();
+
+ assertResult();
+ }
+
+ @Test
+ public void testPutBlob256BSymmetricCSE() throws Exception {
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_256B_VALUE);
+ runner.assertValid();
+ runner.enqueue(TEST_FILE_CONTENT.getBytes());
+ runner.run();
+
+ assertResult();
+ }
+
+ @Test
+ public void testPutBlob384BSymmetricCSE() throws Exception {
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_384B_VALUE);
+ runner.assertValid();
+ runner.enqueue(TEST_FILE_CONTENT.getBytes());
+ runner.run();
+
+ assertResult();
+ }
+
+ @Test
+ public void testPutBlob512BSymmetricCSE() throws Exception {
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_512B_VALUE);
+ runner.assertValid();
+ runner.enqueue(TEST_FILE_CONTENT.getBytes());
runner.run();
assertResult();
@@ -53,7 +133,7 @@
configureCredentialsService();
runner.assertValid();
- runner.enqueue("0123456789".getBytes());
+ runner.enqueue(TEST_FILE_CONTENT.getBytes());
runner.run();
assertResult();
@@ -64,7 +144,7 @@
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "invalid");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "aW52YWxpZGludmFsaWQ=");
runner.assertValid();
- runner.enqueue("test".getBytes());
+ runner.enqueue(TEST_FILE_CONTENT.getBytes());
runner.run();
runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1);
@@ -74,7 +154,7 @@
runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) {
- flowFile.assertContentEquals("0123456789".getBytes());
+ flowFile.assertContentEquals(TEST_FILE_CONTENT.getBytes());
flowFile.assertAttributeEquals("azure.length", "10");
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
index b01264a..bd3f222 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
@@ -32,7 +32,7 @@
@Test
public void testIOExceptionDuringUploadTransfersToFailure() throws Exception {
PutAzureBlobStorage processor = Mockito.spy(new PutAzureBlobStorage());
- doThrow(IOException.class).when(processor).uploadBlob(any(), any(), any());
+ doThrow(IOException.class).when(processor).uploadBlob(any(), any(), any(), any());
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutAzureBlobStorage.BLOB, "test");
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java
new file mode 100644
index 0000000..ba7b2ca
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.PutAzureBlobStorage;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockValidationContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class TestAzureBlobClientSideEncryptionUtils {
+ private static final String KEY_ID_VALUE = "key:id";
+ private static final String KEY_64B_VALUE = "1234567890ABCDEF";
+ private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
+ private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
+ private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
+ private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
+ private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
+
+ private MockProcessContext processContext;
+ private MockValidationContext validationContext;
+
+ @Before
+ public void setUp() {
+ Processor processor = new PutAzureBlobStorage();
+ processContext = new MockProcessContext(processor);
+ validationContext = new MockValidationContext(processContext);
+ }
+
+ @Test
+ public void testNoCesConfiguredOnProcessor() {
+ configureProcessorProperties("NONE", null,null);
+
+ Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testSymmetricCesNoKeyIdOnProcessor() {
+ configureProcessorProperties("SYMMETRIC", null, KEY_128B_VALUE);
+
+ Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ @Test
+ public void testSymmetricCesNoKeyOnProcessor() {
+ configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,null);
+
+ Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ @Test
+ public void testSymmetricCesInvalidHexKeyOnProcessor() {
+ configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,"ZZ");
+
+ Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ @Test
+ public void testSymmetricCes64BitKeyOnProcessor() {
+ configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_64B_VALUE);
+
+ Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ @Test
+ public void testSymmetricCes128BitKeyOnProcessor() {
+ configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_128B_VALUE);
+
+ Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testSymmetricCes192BitKeyOnProcessor() {
+ configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_192B_VALUE);
+
+ Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testSymmetricCes256BitKeyOnProcessor() {
+ configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_256B_VALUE);
+
+ Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testSymmetricCes384BitKeyOnProcessor() {
+ configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_384B_VALUE);
+
+ Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testSymmetricCes512BitKeyOnProcessor() {
+ configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_512B_VALUE);
+
+ Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ private void configureProcessorProperties(String keyType, String keyId, String symmetricKeyHex) {
+ if (keyType != null) {
+ processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, keyType);
+ }
+ if (keyId != null) {
+ processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, keyId);
+ }
+ if (symmetricKeyHex != null) {
+ processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, symmetricKeyHex);
+ }
+ }
+
+ private void assertValid(Collection<ValidationResult> result) {
+ assertTrue("There should be no validation error", result.isEmpty());
+ }
+
+ private void assertNotValid(Collection<ValidationResult> result) {
+ assertFalse("There should be validation error", result.isEmpty());
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
index bdc360b..3ec7085 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
@@ -116,10 +116,10 @@
private void configureProcessorProperties(String accountName, String accountKey, String sasToken) {
if (accountName != null) {
- processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, ACCOUNT_NAME_VALUE);
+ processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, accountName);
}
if (accountKey != null) {
- processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, ACCOUNT_KEY_VALUE);
+ processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, accountKey);
}
if (sasToken != null) {
processContext.setProperty(AzureStorageUtils.PROP_SAS_TOKEN, sasToken);