NIFI-8501 Added Azure blob client side encryption

This closes #5078

Signed-off-by: Joey Frazee <jfrazee@apache.org>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index b79a632..675be64 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -22,6 +22,7 @@
     <properties>
         <azure-eventhubs.version>3.2.1</azure-eventhubs.version>
         <azure-eventhubs-eph.version>3.2.1</azure-eventhubs-eph.version>
+        <azure-keyvault.version>1.2.4</azure-keyvault.version>
     </properties>
     <dependencies>
         <dependency>
@@ -85,6 +86,11 @@
         </dependency>
         <dependency>
             <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-keyvault</artifactId>
+            <version>${azure-keyvault.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.microsoft.azure</groupId>
             <artifactId>azure-eventhubs-eph</artifactId>
             <version>${azure-eventhubs-eph.version}</version>
             <exclusions>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
index 95070c7..9e2b855 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
@@ -16,13 +16,21 @@
  */
 package org.apache.nifi.processors.azure;
 
+import com.microsoft.azure.keyvault.cryptography.SymmetricKey;
+import com.microsoft.azure.storage.blob.BlobEncryptionPolicy;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 
 import java.util.Arrays;
@@ -85,4 +93,24 @@
     public Set<Relationship> getRelationships() {
         return RELATIONSHIPS;
     }
+
+    protected BlobRequestOptions createBlobRequestOptions(ProcessContext context) throws DecoderException {
+        final String cseKeyTypeValue = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE).getValue();
+        final AzureBlobClientSideEncryptionMethod cseKeyType = AzureBlobClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
+
+        final String cseKeyId = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID).getValue();
+
+        final String cseSymmetricKeyHex = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX).getValue();
+
+        BlobRequestOptions blobRequestOptions = new BlobRequestOptions();
+
+        if (cseKeyType == AzureBlobClientSideEncryptionMethod.SYMMETRIC) {
+            byte[] keyBytes = Hex.decodeHex(cseSymmetricKeyHex.toCharArray());
+            SymmetricKey key = new SymmetricKey(cseKeyId, keyBytes);
+            BlobEncryptionPolicy policy = new BlobEncryptionPolicy(key, null);
+            blobRequestOptions.setEncryptionPolicy(policy);
+        }
+
+        return blobRequestOptions;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
index 11996f3..ba003e8 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
@@ -23,10 +23,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.microsoft.azure.storage.OperationContext;
+import org.apache.commons.codec.DecoderException;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -35,6 +37,8 @@
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.DataUnit;
@@ -43,12 +47,14 @@
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
 
 @Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
 @CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile")
@@ -80,10 +86,20 @@
             .build();
 
     @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+        results.addAll(AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext));
+        return results;
+    }
+
+    @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        List<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
+        List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
         properties.add(RANGE_START);
         properties.add(RANGE_LENGTH);
+        properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE);
+        properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
+        properties.add(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
         return properties;
     }
 
@@ -112,11 +128,13 @@
             final Map<String, String> attributes = new HashMap<>();
             final CloudBlob blob = container.getBlockBlobReference(blobPath);
 
+            BlobRequestOptions blobRequestOptions = createBlobRequestOptions(context);
+
             // TODO - we may be able do fancier things with ranges and
             // distribution of download over threads, investigate
             flowFile = session.write(flowFile, os -> {
                 try {
-                    blob.downloadRange(rangeStart, rangeLength, os, null, null, operationContext);
+                    blob.downloadRange(rangeStart, rangeLength, os, null, blobRequestOptions, operationContext);
                 } catch (StorageException e) {
                     storedException.set(e);
                     throw new IOException(e);
@@ -133,7 +151,7 @@
             session.transfer(flowFile, REL_SUCCESS);
             final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
-        } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) {
+        } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException | DecoderException e) {
             if (e instanceof ProcessException && storedException.get() == null) {
                 throw (ProcessException) e;
             } else {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
index 859f2be..c0e473e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -26,11 +26,13 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.microsoft.azure.storage.OperationContext;
+import org.apache.commons.codec.DecoderException;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -39,6 +41,8 @@
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
@@ -46,6 +50,7 @@
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 
 import com.microsoft.azure.storage.StorageException;
@@ -53,6 +58,7 @@
 import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
 
 @Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
 @SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class, DeleteAzureBlobStorage.class })
@@ -89,11 +95,21 @@
             .build();
 
     @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+        results.addAll(AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext));
+        return results;
+    }
+
+    @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
         properties.remove(BLOB);
         properties.add(BLOB_NAME);
         properties.add(CREATE_CONTAINER);
+        properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE);
+        properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
+        properties.add(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
         return properties;
     }
 
@@ -124,6 +140,8 @@
             final OperationContext operationContext = new OperationContext();
             AzureStorageUtils.setProxy(operationContext, context);
 
+            BlobRequestOptions blobRequestOptions = createBlobRequestOptions(context);
+
             final Map<String, String> attributes = new HashMap<>();
             long length = flowFile.getSize();
             session.read(flowFile, rawIn -> {
@@ -142,7 +160,7 @@
                 }
 
                 try {
-                    uploadBlob(blob, operationContext, in);
+                    uploadBlob(blob, operationContext, blobRequestOptions, in);
                     BlobProperties properties = blob.getProperties();
                     attributes.put("azure.container", containerName);
                     attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
@@ -163,7 +181,7 @@
             final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
 
-        } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) {
+        } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException | DecoderException e) {
             if (e instanceof ProcessException && storedException.get() == null) {
                 throw (ProcessException) e;
             } else {
@@ -177,8 +195,8 @@
     }
 
     @VisibleForTesting
-    void uploadBlob(CloudBlob blob, OperationContext operationContext, InputStream in) throws StorageException, IOException {
-        blob.upload(in, -1, null, null, operationContext);
+    void uploadBlob(CloudBlob blob, OperationContext operationContext, BlobRequestOptions blobRequestOptions, InputStream in) throws StorageException, IOException {
+        blob.upload(in, -1, null, blobRequestOptions, operationContext);
     }
 
     // Used to help force Azure Blob SDK to write in blocks
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionMethod.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionMethod.java
new file mode 100644
index 0000000..f6e7419
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionMethod.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+/**
+ * Enumeration capturing essential information about the various client-side
+ * encryption methods supported by Azure
+ */
+public enum AzureBlobClientSideEncryptionMethod {
+
+    NONE("None", "The blobs sent to Azure are not encrypted."),
+    SYMMETRIC("Symmetric", "The blobs sent to Azure are encrypted using a symmetric algorithm.");
+
+    private final String cseName;
+    private final String description;
+
+    AzureBlobClientSideEncryptionMethod(String cseName, String description) {
+        this.cseName = cseName;
+        this.description = description;
+    }
+
+    public String getCseName() {
+        return cseName;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java
new file mode 100644
index 0000000..6e7a2ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.microsoft.azure.keyvault.cryptography.SymmetricKey;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+public class AzureBlobClientSideEncryptionUtils {
+
+    private static final String DEFAULT_KEY_ID = "nifi";
+
+    public static final PropertyDescriptor CSE_KEY_TYPE = new PropertyDescriptor.Builder()
+            .name("cse-key-type")
+            .displayName("Client-Side Encryption Key Type")
+            .required(true)
+            .allowableValues(buildCseEncryptionMethodAllowableValues())
+            .defaultValue(AzureBlobClientSideEncryptionMethod.NONE.name())
+            .description("Specifies the key type to use for client-side encryption.")
+            .build();
+
+    public static final PropertyDescriptor CSE_KEY_ID = new PropertyDescriptor.Builder()
+            .name("cse-key-id")
+            .displayName("Client-Side Encryption Key ID")
+            .description("Specifies the ID of the key to use for client-side encryption.")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name())
+            .build();
+
+    public static final PropertyDescriptor CSE_SYMMETRIC_KEY_HEX = new PropertyDescriptor.Builder()
+            .name("cse-symmetric-key-hex")
+            .displayName("Symmetric Key")
+            .description("When using symmetric client-side encryption, this is the raw key, encoded in hexadecimal")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name())
+            .sensitive(true)
+            .build();
+
+    private static AllowableValue[] buildCseEncryptionMethodAllowableValues() {
+        return Arrays.stream(AzureBlobClientSideEncryptionMethod.values())
+            .map(v -> new AllowableValue(v.name(), v.name(), v.getDescription()))
+            .toArray(AllowableValue[]::new);
+    }
+
+    public static Collection<ValidationResult> validateClientSideEncryptionProperties(ValidationContext validationContext) {
+        final List<ValidationResult> validationResults = new ArrayList<>();
+
+        final String cseKeyTypeValue = validationContext.getProperty(CSE_KEY_TYPE).getValue();
+        final AzureBlobClientSideEncryptionMethod cseKeyType = AzureBlobClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
+
+        final String cseKeyId = validationContext.getProperty(CSE_KEY_ID).getValue();
+
+        final String cseSymmetricKeyHex = validationContext.getProperty(CSE_SYMMETRIC_KEY_HEX).getValue();
+
+        if (cseKeyType != AzureBlobClientSideEncryptionMethod.NONE && StringUtils.isBlank(cseKeyId)) {
+            validationResults.add(new ValidationResult.Builder().subject(CSE_KEY_ID.getDisplayName())
+                    .explanation("a key ID must be set when client-side encryption is enabled.").build());
+        }
+
+        if (cseKeyType == AzureBlobClientSideEncryptionMethod.SYMMETRIC) {
+            validationResults.addAll(validateSymmetricKey(cseSymmetricKeyHex));
+        }
+
+        return validationResults;
+    }
+
+    private static List<ValidationResult> validateSymmetricKey(String keyHex) {
+        final List<ValidationResult> validationResults = new ArrayList<>();
+        if (StringUtils.isBlank(keyHex)) {
+            validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
+                    .explanation("a symmetric key must not be set when client-side encryption is enabled with symmetric encryption.").build());
+        } else {
+            byte[] keyBytes;
+            try {
+                keyBytes = Hex.decodeHex(keyHex.toCharArray());
+                new SymmetricKey(DEFAULT_KEY_ID, keyBytes);
+            } catch (DecoderException e) {
+                validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
+                        .explanation("the symmetric key must be a valid hexadecimal string.").build());
+            } catch (IllegalArgumentException e) {
+                validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
+                        .explanation(e.getMessage()).build());
+            }
+        }
+
+        return validationResults;
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java
new file mode 100644
index 0000000..bdf7687
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ComparisonFailure;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.fail;
+
+public class ITAzureBlobStorageE2E  {
+
+    private static final Properties CONFIG;
+
+    private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
+
+    static {
+        CONFIG = new Properties();
+        try {
+            final FileInputStream fis = new FileInputStream(CREDENTIALS_FILE);
+            try {
+                CONFIG.load(fis);
+            } catch (IOException e) {
+                fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+            } finally {
+                FileUtils.closeQuietly(fis);
+            }
+        } catch (FileNotFoundException e) {
+            fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+        }
+    }
+
+    protected static String getAccountName() {
+        return CONFIG.getProperty("accountName");
+    }
+
+    protected static String getAccountKey() {
+        return CONFIG.getProperty("accountKey");
+    }
+
+    protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container";
+    protected static final String TEST_BLOB_NAME = "nifi-test-blob";
+    protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
+
+    private static final String KEY_ID_VALUE = "key:id";
+    private static final String KEY_64B_VALUE = "1234567890ABCDEF";
+    private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
+    private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
+    private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
+    private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
+    private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
+
+    protected TestRunner putRunner;
+    protected TestRunner listRunner;
+    protected TestRunner fetchRunner;
+
+    protected CloudBlobContainer container;
+
+    @Before
+    public void setupRunners() throws Exception {
+        putRunner = TestRunners.newTestRunner(new PutAzureBlobStorage());
+        listRunner = TestRunners.newTestRunner(new ListAzureBlobStorage());
+        fetchRunner = TestRunners.newTestRunner(new FetchAzureBlobStorage());
+
+        String containerName = String.format("%s-%s", TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
+
+        StorageCredentials storageCredentials = new StorageCredentialsAccountAndKey(getAccountName(), getAccountKey());
+        CloudStorageAccount storageAccount = new CloudStorageAccount(storageCredentials, true);
+
+        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+        container = blobClient.getContainerReference(containerName);
+        container.createIfNotExists();
+
+        setRunnerProperties(putRunner, containerName);
+        setRunnerProperties(listRunner, containerName);
+        setRunnerProperties(fetchRunner, containerName);
+    }
+
+    private void setRunnerProperties(TestRunner runner, String containerName) {
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, getAccountName());
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
+        runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
+    }
+
+    @After
+    public void tearDownAzureContainer() throws Exception {
+        container.deleteIfExists();
+    }
+
+    @Test
+    public void AzureBlobStorageE2ENoCSE() throws Exception {
+        testE2E(AzureBlobClientSideEncryptionMethod.NONE.name(),
+                null,
+                null,
+                AzureBlobClientSideEncryptionMethod.NONE.name(),
+                null,
+                null
+        );
+    }
+
+    @Test
+    public void AzureBlobStorageE2E128BCSE() throws Exception {
+        testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+                KEY_ID_VALUE,
+                KEY_128B_VALUE,
+                AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+                KEY_ID_VALUE,
+                KEY_128B_VALUE
+        );
+    }
+
+    @Test
+    public void AzureBlobStorageE2E192BCSE() throws Exception {
+        testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+                KEY_ID_VALUE,
+                KEY_192B_VALUE,
+                AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+                KEY_ID_VALUE,
+                KEY_192B_VALUE
+        );
+    }
+
+    @Test
+    public void AzureBlobStorageE2E256BCSE() throws Exception {
+        testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+                KEY_ID_VALUE,
+                KEY_256B_VALUE,
+                AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+                KEY_ID_VALUE,
+                KEY_256B_VALUE
+        );
+    }
+
+    @Test
+    public void AzureBlobStorageE2E384BCSE() throws Exception {
+        testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+                KEY_ID_VALUE,
+                KEY_384B_VALUE,
+                AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+                KEY_ID_VALUE,
+                KEY_384B_VALUE
+        );
+    }
+
+    @Test
+    public void AzureBlobStorageE2E512BCSE() throws Exception {
+        testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+                KEY_ID_VALUE,
+                KEY_512B_VALUE,
+                AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+                KEY_ID_VALUE,
+                KEY_512B_VALUE
+        );
+    }
+
+    @Test(expected = ComparisonFailure.class)
+    public void AzureBlobStorageE2E128BCSENoDecryption() throws Exception {
+        testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
+                KEY_ID_VALUE,
+                KEY_128B_VALUE,
+                AzureBlobClientSideEncryptionMethod.NONE.name(),
+                KEY_ID_VALUE,
+                KEY_128B_VALUE
+        );
+    }
+
+    private void testE2E(String encryptionKeyType, String encryptionKeyId, String encryptionKeyHex, String decryptionKeyType, String decryptionKeyId, String decryptionKeyHex) throws Exception {
+        putRunner.setProperty(PutAzureBlobStorage.BLOB, TEST_BLOB_NAME);
+        putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, encryptionKeyType);
+        if (encryptionKeyId == null || encryptionKeyId.isEmpty() || encryptionKeyId.trim().isEmpty()) {
+            putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
+        } else {
+            putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, encryptionKeyId);
+        }
+        if (encryptionKeyHex == null || encryptionKeyHex.isEmpty() || encryptionKeyHex.trim().isEmpty()) {
+            putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
+        } else {
+            putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, encryptionKeyHex);
+        }
+
+        putRunner.assertValid();
+        putRunner.enqueue(TEST_FILE_CONTENT.getBytes());
+        putRunner.run();
+        putRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
+
+        Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2);
+
+        listRunner.assertValid();
+        listRunner.run();
+        listRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
+
+        MockFlowFile entry = listRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0);
+        entry.assertAttributeEquals("mime.type", "application/octet-stream");
+
+        fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, decryptionKeyType);
+        if (decryptionKeyId == null || decryptionKeyId.isEmpty() || decryptionKeyId.trim().isEmpty()) {
+            fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
+        } else {
+            fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, decryptionKeyId);
+        }
+        if (decryptionKeyHex == null || decryptionKeyHex.isEmpty() || decryptionKeyHex.trim().isEmpty()) {
+            fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
+        } else {
+            fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, decryptionKeyHex);
+        }
+        fetchRunner.assertValid();
+        fetchRunner.enqueue(entry);
+        fetchRunner.run();
+        fetchRunner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
+        MockFlowFile fetchedEntry = fetchRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0);
+        fetchedEntry.assertContentEquals(TEST_FILE_CONTENT);
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
index 6d4fce4..49365b5 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
@@ -20,7 +20,6 @@
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.Before;
 import org.junit.Test;
-
 import java.util.concurrent.TimeUnit;
 
 public class ITListAzureBlobStorage extends AbstractAzureBlobStorageIT {
@@ -34,11 +33,11 @@
     public void setUp() throws Exception {
         uploadTestBlob();
 
-        Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS));
+        Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2);
     }
 
     @Test
-    public void testListBlobs() {
+    public void testListBlobs() throws Exception {
         runner.assertValid();
         runner.run(1);
 
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
index 142b592..85644d4 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
@@ -18,6 +18,8 @@
 
 import com.microsoft.azure.storage.blob.ListBlobItem;
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
+import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.Before;
@@ -29,6 +31,16 @@
 
 public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT {
 
+    public static final String TEST_FILE_CONTENT = "0123456789";
+    private static final String KEY_ID_VALUE = "key:id";
+    private static final String KEY_64B_VALUE = "1234567890ABCDEF";
+    private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
+    private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
+    private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
+    private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
+    private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
+
+
     @Override
     protected Class<? extends Processor> getProcessorClass() {
         return PutAzureBlobStorage.class;
@@ -42,7 +54,75 @@
     @Test
     public void testPutBlob() throws Exception {
         runner.assertValid();
-        runner.enqueue("0123456789".getBytes());
+        runner.enqueue(TEST_FILE_CONTENT.getBytes());
+        runner.run();
+
+        assertResult();
+    }
+
+    @Test
+    public void testPutBlob64BSymmetricCSE() {
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_64B_VALUE);
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testPutBlob128BSymmetricCSE() throws Exception {
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_128B_VALUE);
+        runner.assertValid();
+        runner.enqueue(TEST_FILE_CONTENT.getBytes());
+        runner.run();
+
+        assertResult();
+    }
+
+    @Test
+    public void testPutBlob192BSymmetricCSE() throws Exception {
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_192B_VALUE);
+        runner.assertValid();
+        runner.enqueue(TEST_FILE_CONTENT.getBytes());
+        runner.run();
+
+        assertResult();
+    }
+
+    @Test
+    public void testPutBlob256BSymmetricCSE() throws Exception {
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_256B_VALUE);
+        runner.assertValid();
+        runner.enqueue(TEST_FILE_CONTENT.getBytes());
+        runner.run();
+
+        assertResult();
+    }
+
+    @Test
+    public void testPutBlob384BSymmetricCSE() throws Exception {
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_384B_VALUE);
+        runner.assertValid();
+        runner.enqueue(TEST_FILE_CONTENT.getBytes());
+        runner.run();
+
+        assertResult();
+    }
+
+    @Test
+    public void testPutBlob512BSymmetricCSE() throws Exception {
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
+        runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_512B_VALUE);
+        runner.assertValid();
+        runner.enqueue(TEST_FILE_CONTENT.getBytes());
         runner.run();
 
         assertResult();
@@ -53,7 +133,7 @@
         configureCredentialsService();
 
         runner.assertValid();
-        runner.enqueue("0123456789".getBytes());
+        runner.enqueue(TEST_FILE_CONTENT.getBytes());
         runner.run();
 
         assertResult();
@@ -64,7 +144,7 @@
         runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "invalid");
         runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "aW52YWxpZGludmFsaWQ=");
         runner.assertValid();
-        runner.enqueue("test".getBytes());
+        runner.enqueue(TEST_FILE_CONTENT.getBytes());
         runner.run();
 
         runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1);
@@ -74,7 +154,7 @@
         runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
         List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
         for (MockFlowFile flowFile : flowFilesForRelationship) {
-            flowFile.assertContentEquals("0123456789".getBytes());
+            flowFile.assertContentEquals(TEST_FILE_CONTENT.getBytes());
             flowFile.assertAttributeEquals("azure.length", "10");
         }
 
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
index b01264a..bd3f222 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
@@ -32,7 +32,7 @@
     @Test
     public void testIOExceptionDuringUploadTransfersToFailure() throws Exception {
         PutAzureBlobStorage processor = Mockito.spy(new PutAzureBlobStorage());
-        doThrow(IOException.class).when(processor).uploadBlob(any(), any(), any());
+        doThrow(IOException.class).when(processor).uploadBlob(any(), any(), any(), any());
 
         TestRunner runner = TestRunners.newTestRunner(processor);
         runner.setProperty(PutAzureBlobStorage.BLOB, "test");
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java
new file mode 100644
index 0000000..ba7b2ca
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.PutAzureBlobStorage;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockValidationContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class TestAzureBlobClientSideEncryptionUtils {
+    private static final String KEY_ID_VALUE = "key:id";
+    private static final String KEY_64B_VALUE = "1234567890ABCDEF";
+    private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
+    private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
+    private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
+    private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
+    private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
+
+    private MockProcessContext processContext;
+    private MockValidationContext validationContext;
+
+    @Before
+    public void setUp() {
+        Processor processor = new PutAzureBlobStorage();
+        processContext = new MockProcessContext(processor);
+        validationContext = new MockValidationContext(processContext);
+    }
+
+    @Test
+    public void testNoCesConfiguredOnProcessor() {
+        configureProcessorProperties("NONE", null,null);
+
+        Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+        assertValid(result);
+    }
+
+    @Test
+    public void testSymmetricCesNoKeyIdOnProcessor() {
+        configureProcessorProperties("SYMMETRIC", null, KEY_128B_VALUE);
+
+        Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+        assertNotValid(result);
+    }
+
+    @Test
+    public void testSymmetricCesNoKeyOnProcessor() {
+        configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,null);
+
+        Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+        assertNotValid(result);
+    }
+
+    @Test
+    public void testSymmetricCesInvalidHexKeyOnProcessor() {
+        configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,"ZZ");
+
+        Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+        assertNotValid(result);
+    }
+
+    @Test
+    public void testSymmetricCes64BitKeyOnProcessor() {
+        configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_64B_VALUE);
+
+        Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+        assertNotValid(result);
+    }
+
+    @Test
+    public void testSymmetricCes128BitKeyOnProcessor() {
+        configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_128B_VALUE);
+
+        Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+        assertValid(result);
+    }
+
+    @Test
+    public void testSymmetricCes192BitKeyOnProcessor() {
+        configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_192B_VALUE);
+
+        Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+        assertValid(result);
+    }
+
+    @Test
+    public void testSymmetricCes256BitKeyOnProcessor() {
+        configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_256B_VALUE);
+
+        Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+        assertValid(result);
+    }
+
+    @Test
+    public void testSymmetricCes384BitKeyOnProcessor() {
+        configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_384B_VALUE);
+
+        Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+        assertValid(result);
+    }
+
+    @Test
+    public void testSymmetricCes512BitKeyOnProcessor() {
+        configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_512B_VALUE);
+
+        Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
+
+        assertValid(result);
+    }
+
+    private void configureProcessorProperties(String keyType, String keyId, String symmetricKeyHex) {
+        if (keyType != null) {
+            processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, keyType);
+        }
+        if (keyId != null) {
+            processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, keyId);
+        }
+        if (symmetricKeyHex != null) {
+            processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, symmetricKeyHex);
+        }
+    }
+
+    private void assertValid(Collection<ValidationResult> result) {
+        assertTrue("There should be no validation error", result.isEmpty());
+    }
+
+    private void assertNotValid(Collection<ValidationResult> result) {
+        assertFalse("There should be validation error", result.isEmpty());
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
index bdc360b..3ec7085 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java
@@ -116,10 +116,10 @@
 
     private void configureProcessorProperties(String accountName, String accountKey, String sasToken) {
         if (accountName != null) {
-            processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, ACCOUNT_NAME_VALUE);
+            processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, accountName);
         }
         if (accountKey != null) {
-            processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, ACCOUNT_KEY_VALUE);
+            processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, accountKey);
         }
         if (sasToken != null) {
             processContext.setProperty(AzureStorageUtils.PROP_SAS_TOKEN, sasToken);