NIFI-8737: Fixed incorrect provenance data in HDFS processors when Directory property is inconsistent with core-site.xml
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 1cce996..2838a38 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -34,6 +34,7 @@
import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
@@ -606,4 +607,26 @@
}
}
+ protected Path getNormalizedPath(ProcessContext context, PropertyDescriptor property) {
+ return getNormalizedPath(context, property, null);
+ }
+
+ protected Path getNormalizedPath(ProcessContext context, PropertyDescriptor property, FlowFile flowFile) {
+ final String propertyValue = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
+ final Path path = new Path(propertyValue);
+ final URI uri = path.toUri();
+
+ final URI fileSystemUri = getFileSystem().getUri();
+
+ if (uri.getScheme() != null) {
+ if (!uri.getScheme().equals(fileSystemUri.getScheme()) || !uri.getAuthority().equals(fileSystemUri.getAuthority())) {
+ getLogger().warn("The filesystem component of the URI configured in the '{}' property ({}) does not match the filesystem URI from the Hadoop configuration file ({}) " +
+ "and will be ignored.", property.getDisplayName(), uri, fileSystemUri);
+ }
+
+ return new Path(uri.getPath());
+ } else {
+ return path;
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
index 44fae65..b86b7dc 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
@@ -100,13 +100,12 @@
Path tempDotCopyFile = null;
FlowFile putFlowFile = flowFile;
try {
- final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
- final Path configuredRootDirPath = new Path(dirValue);
+ final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
- final long blockSize = getBlockSize(context, session, putFlowFile);
+ final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
final int bufferSize = getBufferSize(context, session, putFlowFile);
- final short replication = getReplication(context, session, putFlowFile);
+ final short replication = getReplication(context, session, putFlowFile, dirPath);
final CompressionCodec codec = getCompressionCodec(context, configuration);
@@ -114,19 +113,19 @@
? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
: putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
- final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename);
- final Path copyFile = new Path(configuredRootDirPath, filename);
+ final Path tempCopyFile = new Path(dirPath, "." + filename);
+ final Path copyFile = new Path(dirPath, filename);
// Create destination directory if it does not exist
try {
- if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
- throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory");
+ if (!hdfs.getFileStatus(dirPath).isDirectory()) {
+ throw new IOException(dirPath.toString() + " already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
- if (!hdfs.mkdirs(configuredRootDirPath)) {
- throw new IOException(configuredRootDirPath.toString() + " could not be created");
+ if (!hdfs.mkdirs(dirPath)) {
+ throw new IOException(dirPath.toString() + " could not be created");
}
- changeOwner(context, hdfs, configuredRootDirPath, flowFile);
+ changeOwner(context, hdfs, dirPath, flowFile);
}
final boolean destinationExists = hdfs.exists(copyFile);
@@ -274,7 +273,7 @@
/**
* Returns with the expected block size.
*/
- protected abstract long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
+ protected abstract long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path dirPath);
/**
* Returns with the expected buffer size.
@@ -284,7 +283,7 @@
/**
* Returns with the expected replication factor.
*/
- protected abstract short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
+ protected abstract short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path dirPath);
/**
* Returns if file system should ignore locality.
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
index 7248e8f..33e762f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
@@ -179,7 +179,7 @@
FlowFile child = null;
final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(originalFlowFile).getValue();
try {
- final Path path = new Path(filenameValue);
+ final Path path = getNormalizedPath(context, FILENAME, originalFlowFile);
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);
final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
index 5ee54e3..a595128 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
@@ -273,10 +273,9 @@
FlowFile putFlowFile = flowFile;
try {
final String filenameValue = putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension
- final String directoryValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
// create the directory if it doesn't exist
- final Path directoryPath = new Path(directoryValue);
+ final Path directoryPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
createDirectory(fileSystem, directoryPath, remoteOwner, remoteGroup);
// write to tempFile first and on success rename to destFile
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index 8788085..9296507 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -145,7 +145,7 @@
// We need a FlowFile to report provenance correctly.
final FlowFile finalFlowFile = originalFlowFile != null ? originalFlowFile : session.create();
- final String fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(finalFlowFile).getValue();
+ final String fileOrDirectoryName = getNormalizedPath(context, FILE_OR_DIRECTORY, finalFlowFile).toString();
final FileSystem fileSystem = getFileSystem();
final UserGroupInformation ugi = getUserGroupInformation();
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 26fd382..b60ee5b 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -125,7 +125,7 @@
final Path path;
try {
- path = new Path(filenameValue);
+ path = getNormalizedPath(context, FILENAME, flowFile);
} catch (IllegalArgumentException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, flowFile, e});
flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index ba53377..3f08da0 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -345,7 +345,7 @@
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT);
- final Path rootDir = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
+ final Path rootDir = getNormalizedPath(context, DIRECTORY);
final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
@@ -427,7 +427,7 @@
if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) {
try {
final FileSystem hdfs = getFileSystem();
- final Path directoryPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
+ final Path directoryPath = getNormalizedPath(context, DIRECTORY);
if (!hdfs.exists(directoryPath)) {
context.yield();
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
index f2864f0..8383732 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
@@ -582,7 +582,8 @@
*/
protected HDFSFileInfoRequest buildRequestDetails(ProcessContext context, FlowFile ff) {
HDFSFileInfoRequest req = new HDFSFileInfoRequest();
- req.setFullPath(context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue());
+ String fullPath = getNormalizedPath(context, FULL_PATH, ff).toString();
+ req.setFullPath(fullPath);
req.setRecursive(context.getProperty(RECURSE_SUBDIRS).asBoolean());
PropertyValue pv;
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 02209e3..583b0b8 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -402,8 +402,6 @@
}
lastRunTimestamp = now;
- final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
-
// Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
try {
final StateMap stateMap = session.getState(Scope.CLUSTER);
@@ -443,7 +441,7 @@
final Set<FileStatus> statuses;
try {
- final Path rootPath = new Path(directory);
+ final Path rootPath = getNormalizedPath(context, DIRECTORY);
statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
} catch (final IOException | IllegalArgumentException e) {
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index 76a03d0..acb4c85 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.hadoop;
import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -245,7 +246,7 @@
Path inputPath;
try {
- inputPath = new Path(filenameValue);
+ inputPath = getNormalizedPath(context, INPUT_DIRECTORY_OR_FILE, flowFile);
if (!hdfs.exists(inputPath)) {
throw new IOException("Input Directory or File does not exist in HDFS");
}
@@ -348,9 +349,8 @@
FlowFile flowFile = session.create(parentFlowFile);
try {
final String originalFilename = file.getName();
- final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions(parentFlowFile).getValue();
- final Path configuredRootOutputDirPath = new Path(outputDirValue);
- final Path newFile = new Path(configuredRootOutputDirPath, originalFilename);
+ final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile);
+ final Path newFile = new Path(outputDirPath, originalFilename);
final boolean destinationExists = hdfs.exists(newFile);
// If destination file already exists, resolve that
// based on processor configuration
@@ -382,15 +382,15 @@
// Create destination directory if it does not exist
try {
- if (!hdfs.getFileStatus(configuredRootOutputDirPath).isDirectory()) {
- throw new IOException(configuredRootOutputDirPath.toString()
+ if (!hdfs.getFileStatus(outputDirPath).isDirectory()) {
+ throw new IOException(outputDirPath.toString()
+ " already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
- if (!hdfs.mkdirs(configuredRootOutputDirPath)) {
- throw new IOException(configuredRootOutputDirPath.toString() + " could not be created");
+ if (!hdfs.mkdirs(outputDirPath)) {
+ throw new IOException(outputDirPath.toString() + " could not be created");
}
- changeOwner(context, hdfs, configuredRootOutputDirPath);
+ changeOwner(context, hdfs, outputDirPath);
}
boolean moved = false;
@@ -419,8 +419,7 @@
final String hdfsPath = newFile.getParent().toString();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
- final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath
- : "hdfs://" + outputPath;
+ final String transitUri = hdfs.getUri() + StringUtils.prependIfMissing(outputPath, "/");
session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index fef0805..00942e3 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -189,11 +189,9 @@
}
@Override
- protected long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
- final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final Path configuredRootDirPath = new Path(dirValue);
+ protected long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) {
final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
- return blockSizeProp != null ? blockSizeProp.longValue() : getFileSystem().getDefaultBlockSize(configuredRootDirPath);
+ return blockSizeProp != null ? blockSizeProp.longValue() : getFileSystem().getDefaultBlockSize(dirPath);
}
@Override
@@ -203,12 +201,10 @@
}
@Override
- protected short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
- final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final Path configuredRootDirPath = new Path(dirValue);
+ protected short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) {
final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
return replicationProp != null ? replicationProp.shortValue() : getFileSystem()
- .getDefaultReplication(configuredRootDirPath);
+ .getDefaultReplication(dirPath);
}
@Override
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
index d60ba7f..0cd069e 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.hadoop;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.FileResourceReference;
@@ -39,11 +41,14 @@
import java.io.File;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -207,4 +212,58 @@
runner.setProperty(kerberosProperties.getKerberosKeytab(), temporaryFile.getAbsolutePath());
runner.assertValid();
}
+
+ @Test
+ public void testGetNormalizedPathWithoutFileSystem() throws URISyntaxException {
+ AbstractHadoopProcessor processor = initProcessorForTestGetNormalizedPath("abfs://container1@storageaccount1");
+ TestRunner runner = initTestRunnerForTestGetNormalizedPath(processor, "/dir1");
+
+ Path path = processor.getNormalizedPath(runner.getProcessContext(), AbstractHadoopProcessor.DIRECTORY);
+
+ assertEquals("/dir1", path.toString());
+ assertTrue(runner.getLogger().getWarnMessages().isEmpty());
+ }
+
+ @Test
+ public void testGetNormalizedPathWithCorrectFileSystem() throws URISyntaxException {
+ AbstractHadoopProcessor processor = initProcessorForTestGetNormalizedPath("abfs://container2@storageaccount2");
+ TestRunner runner = initTestRunnerForTestGetNormalizedPath(processor, "abfs://container2@storageaccount2/dir2");
+
+ Path path = processor.getNormalizedPath(runner.getProcessContext(), AbstractHadoopProcessor.DIRECTORY);
+
+ assertEquals("/dir2", path.toString());
+ assertTrue(runner.getLogger().getWarnMessages().isEmpty());
+ }
+
+ @Test
+ public void testGetNormalizedPathWithIncorrectFileSystem() throws URISyntaxException {
+ AbstractHadoopProcessor processor = initProcessorForTestGetNormalizedPath("abfs://container3@storageaccount3");
+ TestRunner runner = initTestRunnerForTestGetNormalizedPath(processor, "abfs://container*@storageaccount*/dir3");
+
+ Path path = processor.getNormalizedPath(runner.getProcessContext(), AbstractHadoopProcessor.DIRECTORY);
+
+ assertEquals("/dir3", path.toString());
+ assertFalse(runner.getLogger().getWarnMessages().isEmpty());
+ }
+
+ private AbstractHadoopProcessor initProcessorForTestGetNormalizedPath(String fileSystemUri) throws URISyntaxException {
+ final FileSystem fileSystem = mock(FileSystem.class);
+ when(fileSystem.getUri()).thenReturn(new URI(fileSystemUri));
+
+ final PutHDFS processor = new PutHDFS() {
+ @Override
+ protected FileSystem getFileSystem() {
+ return fileSystem;
+ }
+ };
+
+ return processor;
+ }
+
+ private TestRunner initTestRunnerForTestGetNormalizedPath(AbstractHadoopProcessor processor, String directory) throws URISyntaxException {
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(AbstractHadoopProcessor.DIRECTORY, directory);
+
+ return runner;
+ }
}