blob: 57c57cf60a1c6a13a86a68ce89d706e6d452fb6a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslPlainServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import javax.net.SocketFactory;
import javax.security.auth.login.LoginException;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
/**
* This is a base class that is helpful when building processors interacting with HDFS.
* <p/>
* As of Apache NiFi 1.5.0, the Relogin Period property is no longer used in the configuration of a Hadoop processor.
* Due to changes made to {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this
* class to authenticate a principal with Kerberos, Hadoop components no longer
* attempt relogins explicitly. For more information, please read the documentation for
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}.
*
* @see SecurityUtil#loginKerberos(Configuration, String, String)
*/
@RequiresInstanceClassLoading(cloneAncestorResources = true)
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
private static final String DENY_LFS_ACCESS = "NIFI_HDFS_DENY_LOCAL_FILE_SYSTEM_ACCESS";
private static final String DENY_LFS_EXPLANATION = String.format("LFS Access Denied according to Environment Variable [%s]", DENY_LFS_ACCESS);
private static final Pattern LOCAL_FILE_SYSTEM_URI = Pattern.compile("^file:.*");
// properties
public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
.name("Hadoop Configuration Resources")
.description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
+ "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration. "
+ "To use swebhdfs, see 'Additional Details' section of PutHDFS's documentation.")
.required(false)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name("Directory")
.description("The HDFS directory from which files should be read")
.required(true)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.name("Compression codec")
.required(true)
.allowableValues(CompressionType.allowableValues())
.defaultValue(CompressionType.NONE.toString())
.build();
/*
* TODO This property has been deprecated, remove for NiFi 2.0
*/
public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder()
.name("Kerberos Relogin Period")
.required(false)
.description("Period of time which should pass before attempting a kerberos relogin.\n\nThis property has been deprecated, and has no effect on processing. " +
"Relogins now occur automatically.")
.defaultValue("4 hours")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder()
.name("Additional Classpath Resources")
.description("A comma-separated list of paths to files and/or directories that will be added to the classpath and used for loading native libraries. " +
"When specifying a directory, all files with in the directory will be added to the classpath, but further sub-directories will not be included.")
.required(false)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY)
.dynamicallyModifiesClasspath(true)
.build();
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosCredentialsService.class)
.required(false)
.build();
public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
private static final Object RESOURCES_LOCK = new Object();
private static final HdfsResources EMPTY_HDFS_RESOURCES = new HdfsResources(null, null, null, null);
protected KerberosProperties kerberosProperties;
protected List<PropertyDescriptor> properties;
private volatile File kerberosConfigFile = null;
// variables shared by all threads of this processor
// Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
private final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
// Holder of cached Configuration information so validation does not reload the same config over and over
private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
@Override
protected void init(ProcessorInitializationContext context) {
hdfsResources.set(EMPTY_HDFS_RESOURCES);
kerberosConfigFile = context.getKerberosConfigurationFile();
kerberosProperties = getKerberosProperties(kerberosConfigFile);
List<PropertyDescriptor> props = new ArrayList<>();
props.add(HADOOP_CONFIGURATION_RESOURCES);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(kerberosProperties.getKerberosPrincipal());
props.add(kerberosProperties.getKerberosKeytab());
props.add(kerberosProperties.getKerberosPassword());
props.add(KERBEROS_RELOGIN_PERIOD);
props.add(ADDITIONAL_CLASSPATH_RESOURCES);
properties = Collections.unmodifiableList(props);
}
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return new KerberosProperties(kerberosConfigFile);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
final String resolvedKeytab;
if (credentialsService == null) {
resolvedPrincipal = explicitPrincipal;
resolvedKeytab = explicitKeytab;
} else {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
}
final List<ValidationResult> results = new ArrayList<>();
final List<String> locations = getConfigLocations(validationContext);
if (locations.isEmpty()) {
return results;
}
try {
final Configuration conf = getHadoopConfigurationForValidation(locations);
results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(
this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
results.addAll(validateFileSystem(conf));
} catch (final IOException e) {
results.add(new ValidationResult.Builder()
.valid(false)
.subject("Hadoop Configuration Resources")
.explanation("Could not load Hadoop Configuration resources due to: " + e)
.build());
}
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
results.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
.explanation("Cannot specify a Kerberos Credentials Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password")
.build());
}
if (!isAllowExplicitKeytab() && explicitKeytab != null) {
results.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
.explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring Kerberos Keytab in processors. "
+ "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
.build());
}
return results;
}
protected Collection<ValidationResult> validateFileSystem(final Configuration configuration) {
final List<ValidationResult> results = new ArrayList<>();
if (isFileSystemAccessDenied(FileSystem.getDefaultUri(configuration))) {
results.add(new ValidationResult.Builder()
.valid(false)
.subject("Hadoop File System")
.explanation(DENY_LFS_EXPLANATION)
.build());
}
return results;
}
protected Configuration getHadoopConfigurationForValidation(final List<String> locations) throws IOException {
ValidationResources resources = validationResourceHolder.get();
// if no resources in the holder, or if the holder has different resources loaded,
// then load the Configuration and set the new resources in the holder
if (resources == null || !locations.equals(resources.getConfigLocations())) {
getLogger().debug("Reloading validation resources");
final Configuration config = new ExtendedConfiguration(getLogger());
config.setClassLoader(Thread.currentThread().getContextClassLoader());
resources = new ValidationResources(locations, getConfigurationFromResources(config, locations));
validationResourceHolder.set(resources);
}
return resources.getConfiguration();
}
/**
* If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context)
*/
@OnScheduled
public final void abstractOnScheduled(ProcessContext context) throws IOException {
try {
// This value will be null when called from ListHDFS, because it overrides all of the default
// properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
HdfsResources resources = hdfsResources.get();
if (resources.getConfiguration() == null) {
resources = resetHDFSResources(getConfigLocations(context), context);
hdfsResources.set(resources);
}
} catch (Exception ex) {
getLogger().error("HDFS Configuration error - {}", new Object[]{ex});
hdfsResources.set(EMPTY_HDFS_RESOURCES);
throw ex;
}
}
protected List<String> getConfigLocations(PropertyContext context) {
final ResourceReferences configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources();
final List<String> locations = configResources.asLocations();
return locations;
}
@OnStopped
public final void abstractOnStopped() {
final HdfsResources resources = hdfsResources.get();
if (resources != null) {
// Attempt to close the FileSystem
final FileSystem fileSystem = resources.getFileSystem();
try {
interruptStatisticsThread(fileSystem);
} catch (Exception e) {
getLogger().warn("Error stopping FileSystem statistics thread: " + e.getMessage());
getLogger().debug("", e);
} finally {
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
getLogger().warn("Error close FileSystem: " + e.getMessage(), e);
}
}
}
// Clean-up the static reference to the Configuration instance
UserGroupInformation.setConfiguration(new Configuration());
// Clean-up the reference to the InstanceClassLoader that was put into Configuration
final Configuration configuration = resources.getConfiguration();
if (configuration != null) {
configuration.setClassLoader(null);
}
// Need to remove the Provider instance from the JVM's Providers class so that InstanceClassLoader can be GC'd eventually
final SaslPlainServer.SecurityProvider saslProvider = new SaslPlainServer.SecurityProvider();
Security.removeProvider(saslProvider.getName());
}
// Clear out the reference to the resources
hdfsResources.set(EMPTY_HDFS_RESOURCES);
}
private void interruptStatisticsThread(final FileSystem fileSystem) throws NoSuchFieldException, IllegalAccessException {
final Field statsField = FileSystem.class.getDeclaredField("statistics");
statsField.setAccessible(true);
final Object statsObj = statsField.get(fileSystem);
if (statsObj != null && statsObj instanceof FileSystem.Statistics) {
final FileSystem.Statistics statistics = (FileSystem.Statistics) statsObj;
final Field statsThreadField = statistics.getClass().getDeclaredField("STATS_DATA_CLEANER");
statsThreadField.setAccessible(true);
final Object statsThreadObj = statsThreadField.get(statistics);
if (statsThreadObj != null && statsThreadObj instanceof Thread) {
final Thread statsThread = (Thread) statsThreadObj;
try {
statsThread.interrupt();
} catch (Exception e) {
getLogger().warn("Error interrupting thread: " + e.getMessage(), e);
}
}
}
}
private static Configuration getConfigurationFromResources(final Configuration config, final List<String> locations) throws IOException {
boolean foundResources = !locations.isEmpty();
if (foundResources) {
for (String resource : locations) {
config.addResource(new Path(resource.trim()));
}
} else {
// check that at least 1 non-default resource is available on the classpath
String configStr = config.toString();
for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
if (!resource.contains("default") && config.getResource(resource.trim()) != null) {
foundResources = true;
break;
}
}
}
if (!foundResources) {
throw new IOException("Could not find any of the " + HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath");
}
return config;
}
/*
* Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
*/
HdfsResources resetHDFSResources(final List<String> resourceLocations, ProcessContext context) throws IOException {
Configuration config = new ExtendedConfiguration(getLogger());
config.setClassLoader(Thread.currentThread().getContextClassLoader());
getConfigurationFromResources(config, resourceLocations);
// give sub-classes a chance to process configuration
preProcessConfiguration(config, context);
// first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
checkHdfsUriForTimeout(config);
// disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete
// restart
String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
config.set(disableCacheName, "true");
// If kerberos is enabled, create the file system as the kerberos principal
// -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
FileSystem fs;
UserGroupInformation ugi;
KerberosUser kerberosUser;
synchronized (RESOURCES_LOCK) {
if (SecurityUtil.isSecurityEnabled(config)) {
String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
String password = context.getProperty(kerberosProperties.getKerberosPassword()).getValue();
// If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
// The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
if (credentialsService != null) {
principal = credentialsService.getPrincipal();
keyTab = credentialsService.getKeytab();
}
if (keyTab != null) {
kerberosUser = new KerberosKeytabUser(principal, keyTab);
} else if (password != null) {
kerberosUser = new KerberosPasswordUser(principal, password);
} else {
throw new IOException("Unable to authenticate with Kerberos, no keytab or password was provided");
}
ugi = SecurityUtil.getUgiForKerberosUser(config, kerberosUser);
} else {
config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
config.set("hadoop.security.authentication", "simple");
ugi = SecurityUtil.loginSimple(config);
kerberosUser = null;
}
fs = getFileSystemAsUser(config, ugi);
}
getLogger().debug("resetHDFSResources UGI [{}], KerberosUser [{}]", new Object[]{ugi, kerberosUser});
final Path workingDir = fs.getWorkingDirectory();
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()});
return new HdfsResources(config, fs, ugi, kerberosUser);
}
/**
* This method will be called after the Configuration has been created, but before the FileSystem is created,
* allowing sub-classes to take further action on the Configuration before creating the FileSystem.
*
* @param config the Configuration that will be used to create the FileSystem
* @param context the context that can be used to retrieve additional values
*/
protected void preProcessConfiguration(final Configuration config, final ProcessContext context) {
}
/**
* This exists in order to allow unit tests to override it so that they don't take several minutes waiting for UDP packets to be received
*
* @param config
* the configuration to use
* @return the FileSystem that is created for the given Configuration
* @throws IOException
* if unable to create the FileSystem
*/
protected FileSystem getFileSystem(final Configuration config) throws IOException {
return FileSystem.get(config);
}
protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
try {
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(config);
}
});
} catch (InterruptedException e) {
throw new IOException("Unable to create file system: " + e.getMessage());
}
}
/*
* Drastically reduce the timeout of a socket connection from the default in FileSystem.get()
*/
protected void checkHdfsUriForTimeout(Configuration config) throws IOException {
URI hdfsUri = FileSystem.getDefaultUri(config);
String address = hdfsUri.getAuthority();
int port = hdfsUri.getPort();
if (address == null || address.isEmpty() || port < 0) {
return;
}
InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
Socket socket = null;
try {
socket = socketFactory.createSocket();
NetUtils.connect(socket, namenode, 1000); // 1 second timeout
} finally {
IOUtils.closeQuietly(socket);
}
}
/**
* Returns the configured CompressionCodec, or null if none is configured.
*
* @param context
* the ProcessContext
* @param configuration
* the Hadoop Configuration
* @return CompressionCodec or null
*/
protected org.apache.hadoop.io.compress.CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
org.apache.hadoop.io.compress.CompressionCodec codec = null;
if (context.getProperty(COMPRESSION_CODEC).isSet()) {
String compressionClassname = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString();
CompressionCodecFactory ccf = new CompressionCodecFactory(configuration);
codec = ccf.getCodecByClassName(compressionClassname);
}
return codec;
}
/**
* Returns the relative path of the child that does not include the filename or the root path.
*
* @param root
* the path to relativize from
* @param child
* the path to relativize
* @return the relative path
*/
public static String getPathDifference(final Path root, final Path child) {
final int depthDiff = child.depth() - root.depth();
if (depthDiff <= 1) {
return "".intern();
}
String lastRoot = root.getName();
Path childsParent = child.getParent();
final StringBuilder builder = new StringBuilder();
builder.append(childsParent.getName());
for (int i = (depthDiff - 3); i >= 0; i--) {
childsParent = childsParent.getParent();
String name = childsParent.getName();
if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) {
break;
}
builder.insert(0, Path.SEPARATOR).insert(0, name);
}
return builder.toString();
}
protected Configuration getConfiguration() {
return hdfsResources.get().getConfiguration();
}
protected FileSystem getFileSystem() {
return hdfsResources.get().getFileSystem();
}
protected UserGroupInformation getUserGroupInformation() {
getLogger().trace("getting UGI instance");
if (hdfsResources.get().getKerberosUser() != null) {
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
KerberosUser kerberosUser = hdfsResources.get().getKerberosUser();
getLogger().debug("kerberosUser is " + kerberosUser);
try {
getLogger().debug("checking TGT on kerberosUser " + kerberosUser);
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
}
} else {
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
}
return hdfsResources.get().getUserGroupInformation();
}
/*
* Overridable by subclasses in the same package, mainly intended for testing purposes to allow verification without having to set environment variables.
*/
boolean isAllowExplicitKeytab() {
return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
}
boolean isLocalFileSystemAccessDenied() {
return Boolean.parseBoolean(System.getenv(DENY_LFS_ACCESS));
}
protected boolean isFileSystemAccessDenied(final URI fileSystemUri) {
boolean accessDenied;
if (isLocalFileSystemAccessDenied()) {
accessDenied = LOCAL_FILE_SYSTEM_URI.matcher(fileSystemUri.toString()).matches();
} else {
accessDenied = false;
}
return accessDenied;
}
static protected class ValidationResources {
private final List<String> configLocations;
private final Configuration configuration;
public ValidationResources(final List<String> configLocations, final Configuration configuration) {
this.configLocations = configLocations;
this.configuration = configuration;
}
public List<String> getConfigLocations() {
return configLocations;
}
public Configuration getConfiguration() {
return configuration;
}
}
protected Path getNormalizedPath(ProcessContext context, PropertyDescriptor property) {
return getNormalizedPath(context, property, null);
}
protected Path getNormalizedPath(final String rawPath) {
final Path path = new Path(rawPath);
final URI uri = path.toUri();
final URI fileSystemUri = getFileSystem().getUri();
if (uri.getScheme() != null) {
if (!uri.getScheme().equals(fileSystemUri.getScheme()) || !uri.getAuthority().equals(fileSystemUri.getAuthority())) {
getLogger().warn("The filesystem component of the URI configured ({}) does not match the filesystem URI from the Hadoop configuration file ({}) " +
"and will be ignored.", uri, fileSystemUri);
}
return new Path(uri.getPath());
} else {
return path;
}
}
protected Path getNormalizedPath(final ProcessContext context, final PropertyDescriptor property, final FlowFile flowFile) {
final String propertyValue = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
final Path path = new Path(propertyValue);
final URI uri = path.toUri();
final URI fileSystemUri = getFileSystem().getUri();
if (uri.getScheme() != null) {
if (!uri.getScheme().equals(fileSystemUri.getScheme()) || !uri.getAuthority().equals(fileSystemUri.getAuthority())) {
getLogger().warn("The filesystem component of the URI configured in the '{}' property ({}) does not match the filesystem URI from the Hadoop configuration file ({}) " +
"and will be ignored.", property.getDisplayName(), uri, fileSystemUri);
}
return new Path(uri.getPath());
} else {
return path;
}
}
}