blob: bdf768783aa66333495fbf2d8ad5ddd05fa007cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ComparisonFailure;
import org.junit.Test;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.fail;
public class ITAzureBlobStorageE2E {
private static final Properties CONFIG;
private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
static {
CONFIG = new Properties();
try {
final FileInputStream fis = new FileInputStream(CREDENTIALS_FILE);
try {
CONFIG.load(fis);
} catch (IOException e) {
fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
} finally {
FileUtils.closeQuietly(fis);
}
} catch (FileNotFoundException e) {
fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
}
}
protected static String getAccountName() {
return CONFIG.getProperty("accountName");
}
protected static String getAccountKey() {
return CONFIG.getProperty("accountKey");
}
protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container";
protected static final String TEST_BLOB_NAME = "nifi-test-blob";
protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
private static final String KEY_ID_VALUE = "key:id";
private static final String KEY_64B_VALUE = "1234567890ABCDEF";
private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
protected TestRunner putRunner;
protected TestRunner listRunner;
protected TestRunner fetchRunner;
protected CloudBlobContainer container;
@Before
public void setupRunners() throws Exception {
putRunner = TestRunners.newTestRunner(new PutAzureBlobStorage());
listRunner = TestRunners.newTestRunner(new ListAzureBlobStorage());
fetchRunner = TestRunners.newTestRunner(new FetchAzureBlobStorage());
String containerName = String.format("%s-%s", TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
StorageCredentials storageCredentials = new StorageCredentialsAccountAndKey(getAccountName(), getAccountKey());
CloudStorageAccount storageAccount = new CloudStorageAccount(storageCredentials, true);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
container = blobClient.getContainerReference(containerName);
container.createIfNotExists();
setRunnerProperties(putRunner, containerName);
setRunnerProperties(listRunner, containerName);
setRunnerProperties(fetchRunner, containerName);
}
private void setRunnerProperties(TestRunner runner, String containerName) {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, getAccountName());
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
}
@After
public void tearDownAzureContainer() throws Exception {
container.deleteIfExists();
}
@Test
public void AzureBlobStorageE2ENoCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.NONE.name(),
null,
null,
AzureBlobClientSideEncryptionMethod.NONE.name(),
null,
null
);
}
@Test
public void AzureBlobStorageE2E128BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_128B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_128B_VALUE
);
}
@Test
public void AzureBlobStorageE2E192BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_192B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_192B_VALUE
);
}
@Test
public void AzureBlobStorageE2E256BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_256B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_256B_VALUE
);
}
@Test
public void AzureBlobStorageE2E384BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_384B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_384B_VALUE
);
}
@Test
public void AzureBlobStorageE2E512BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_512B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_512B_VALUE
);
}
@Test(expected = ComparisonFailure.class)
public void AzureBlobStorageE2E128BCSENoDecryption() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_128B_VALUE,
AzureBlobClientSideEncryptionMethod.NONE.name(),
KEY_ID_VALUE,
KEY_128B_VALUE
);
}
private void testE2E(String encryptionKeyType, String encryptionKeyId, String encryptionKeyHex, String decryptionKeyType, String decryptionKeyId, String decryptionKeyHex) throws Exception {
putRunner.setProperty(PutAzureBlobStorage.BLOB, TEST_BLOB_NAME);
putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, encryptionKeyType);
if (encryptionKeyId == null || encryptionKeyId.isEmpty() || encryptionKeyId.trim().isEmpty()) {
putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
} else {
putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, encryptionKeyId);
}
if (encryptionKeyHex == null || encryptionKeyHex.isEmpty() || encryptionKeyHex.trim().isEmpty()) {
putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
} else {
putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, encryptionKeyHex);
}
putRunner.assertValid();
putRunner.enqueue(TEST_FILE_CONTENT.getBytes());
putRunner.run();
putRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2);
listRunner.assertValid();
listRunner.run();
listRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
MockFlowFile entry = listRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0);
entry.assertAttributeEquals("mime.type", "application/octet-stream");
fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, decryptionKeyType);
if (decryptionKeyId == null || decryptionKeyId.isEmpty() || decryptionKeyId.trim().isEmpty()) {
fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
} else {
fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, decryptionKeyId);
}
if (decryptionKeyHex == null || decryptionKeyHex.isEmpty() || decryptionKeyHex.trim().isEmpty()) {
fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
} else {
fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, decryptionKeyHex);
}
fetchRunner.assertValid();
fetchRunner.enqueue(entry);
fetchRunner.run();
fetchRunner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
MockFlowFile fetchedEntry = fetchRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0);
fetchedEntry.assertContentEquals(TEST_FILE_CONTENT);
}
}