NIFI-9032 Refactoring HDFS processors in order to increase flexibility
This closes #5295.
Signed-off-by: Tamas Palfy <tamas.bertalan.palfy@gmail.com>
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 2838a38..57c57cf 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -33,6 +33,7 @@
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.KerberosProperties;
@@ -134,7 +135,7 @@
.dynamicallyModifiesClasspath(true)
.build();
- static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
@@ -187,7 +188,6 @@
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
- final ResourceReferences configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources();
final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
@@ -204,36 +204,18 @@
}
final List<ValidationResult> results = new ArrayList<>();
+ final List<String> locations = getConfigLocations(validationContext);
- if (configResources.getCount() == 0) {
+ if (locations.isEmpty()) {
return results;
}
try {
- ValidationResources resources = validationResourceHolder.get();
-
- // if no resources in the holder, or if the holder has different resources loaded,
- // then load the Configuration and set the new resources in the holder
- if (resources == null || !configResources.equals(resources.getConfigResources())) {
- getLogger().debug("Reloading validation resources");
- final Configuration config = new ExtendedConfiguration(getLogger());
- config.setClassLoader(Thread.currentThread().getContextClassLoader());
- resources = new ValidationResources(configResources, getConfigurationFromResources(config, configResources));
- validationResourceHolder.set(resources);
- }
-
- final Configuration conf = resources.getConfiguration();
+ final Configuration conf = getHadoopConfigurationForValidation(locations);
results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(
- this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
+ this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
- final URI fileSystemUri = FileSystem.getDefaultUri(conf);
- if (isFileSystemAccessDenied(fileSystemUri)) {
- results.add(new ValidationResult.Builder()
- .valid(false)
- .subject("Hadoop File System")
- .explanation(DENY_LFS_EXPLANATION)
- .build());
- }
+ results.addAll(validateFileSystem(conf));
} catch (final IOException e) {
results.add(new ValidationResult.Builder()
.valid(false)
@@ -262,6 +244,36 @@
return results;
}
+ protected Collection<ValidationResult> validateFileSystem(final Configuration configuration) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ if (isFileSystemAccessDenied(FileSystem.getDefaultUri(configuration))) {
+ results.add(new ValidationResult.Builder()
+ .valid(false)
+ .subject("Hadoop File System")
+ .explanation(DENY_LFS_EXPLANATION)
+ .build());
+ }
+
+ return results;
+ }
+
+ protected Configuration getHadoopConfigurationForValidation(final List<String> locations) throws IOException {
+ ValidationResources resources = validationResourceHolder.get();
+
+ // if no resources in the holder, or if the holder has different resources loaded,
+ // then load the Configuration and set the new resources in the holder
+ if (resources == null || !locations.equals(resources.getConfigLocations())) {
+ getLogger().debug("Reloading validation resources");
+ final Configuration config = new ExtendedConfiguration(getLogger());
+ config.setClassLoader(Thread.currentThread().getContextClassLoader());
+ resources = new ValidationResources(locations, getConfigurationFromResources(config, locations));
+ validationResourceHolder.set(resources);
+ }
+
+ return resources.getConfiguration();
+ }
+
/**
* If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context)
*/
@@ -272,17 +284,22 @@
// properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
HdfsResources resources = hdfsResources.get();
if (resources.getConfiguration() == null) {
- final ResourceReferences configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources();
- resources = resetHDFSResources(configResources, context);
+ resources = resetHDFSResources(getConfigLocations(context), context);
hdfsResources.set(resources);
}
} catch (Exception ex) {
- getLogger().error("HDFS Configuration error - {}", new Object[] { ex });
+ getLogger().error("HDFS Configuration error - {}", new Object[]{ex});
hdfsResources.set(EMPTY_HDFS_RESOURCES);
throw ex;
}
}
+ protected List<String> getConfigLocations(PropertyContext context) {
+ final ResourceReferences configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources();
+ final List<String> locations = configResources.asLocations();
+ return locations;
+ }
+
@OnStopped
public final void abstractOnStopped() {
final HdfsResources resources = hdfsResources.get();
@@ -345,10 +362,10 @@
}
}
- private static Configuration getConfigurationFromResources(final Configuration config, final ResourceReferences resourceReferences) throws IOException {
- boolean foundResources = resourceReferences.getCount() > 0;
+ private static Configuration getConfigurationFromResources(final Configuration config, final List<String> locations) throws IOException {
+ boolean foundResources = !locations.isEmpty();
+
if (foundResources) {
- final List<String> locations = resourceReferences.asLocations();
for (String resource : locations) {
config.addResource(new Path(resource.trim()));
}
@@ -372,11 +389,11 @@
/*
* Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
*/
- HdfsResources resetHDFSResources(final ResourceReferences resourceReferences, ProcessContext context) throws IOException {
+ HdfsResources resetHDFSResources(final List<String> resourceLocations, ProcessContext context) throws IOException {
Configuration config = new ExtendedConfiguration(getLogger());
config.setClassLoader(Thread.currentThread().getContextClassLoader());
- getConfigurationFromResources(config, resourceReferences);
+ getConfigurationFromResources(config, resourceLocations);
// give sub-classes a chance to process configuration
preProcessConfiguration(config, context);
@@ -559,7 +576,7 @@
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
- }
+ }
} else {
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
}
@@ -577,7 +594,7 @@
return Boolean.parseBoolean(System.getenv(DENY_LFS_ACCESS));
}
- private boolean isFileSystemAccessDenied(final URI fileSystemUri) {
+ protected boolean isFileSystemAccessDenied(final URI fileSystemUri) {
boolean accessDenied;
if (isLocalFileSystemAccessDenied()) {
@@ -590,16 +607,16 @@
}
static protected class ValidationResources {
- private final ResourceReferences configResources;
+ private final List<String> configLocations;
private final Configuration configuration;
- public ValidationResources(final ResourceReferences configResources, Configuration configuration) {
- this.configResources = configResources;
+ public ValidationResources(final List<String> configLocations, final Configuration configuration) {
+ this.configLocations = configLocations;
this.configuration = configuration;
}
- public ResourceReferences getConfigResources() {
- return configResources;
+ public List<String> getConfigLocations() {
+ return configLocations;
}
public Configuration getConfiguration() {
@@ -611,7 +628,25 @@
return getNormalizedPath(context, property, null);
}
- protected Path getNormalizedPath(ProcessContext context, PropertyDescriptor property, FlowFile flowFile) {
+ protected Path getNormalizedPath(final String rawPath) {
+ final Path path = new Path(rawPath);
+ final URI uri = path.toUri();
+
+ final URI fileSystemUri = getFileSystem().getUri();
+
+ if (uri.getScheme() != null) {
+ if (!uri.getScheme().equals(fileSystemUri.getScheme()) || !uri.getAuthority().equals(fileSystemUri.getAuthority())) {
+ getLogger().warn("The filesystem component of the URI configured ({}) does not match the filesystem URI from the Hadoop configuration file ({}) " +
+ "and will be ignored.", uri, fileSystemUri);
+ }
+
+ return new Path(uri.getPath());
+ } else {
+ return path;
+ }
+ }
+
+ protected Path getNormalizedPath(final ProcessContext context, final PropertyDescriptor property, final FlowFile flowFile) {
final String propertyValue = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
final Path path = new Path(propertyValue);
final URI uri = path.toUri();
@@ -629,4 +664,4 @@
return path;
}
}
-}
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
deleted file mode 100644
index b86b7dc..0000000
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.hadoop;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsCreateModes;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.StopWatch;
-import org.ietf.jgss.GSSException;
-
-import java.io.BufferedInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.security.PrivilegedAction;
-import java.util.EnumSet;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
-
-public abstract class AbstractPutHDFS extends AbstractHadoopProcessor {
- protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
- protected static final int BUFFER_SIZE_DEFAULT = 4096;
-
- protected static final String REPLACE_RESOLUTION = "replace";
- protected static final String IGNORE_RESOLUTION = "ignore";
- protected static final String FAIL_RESOLUTION = "fail";
- protected static final String APPEND_RESOLUTION = "append";
-
- protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
- REPLACE_RESOLUTION, "Replaces the existing file if any.");
- protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
- "Ignores the flow file and routes it to success.");
- protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
- "Penalizes the flow file and routes it to failure.");
- protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
- "Appends to the existing file if any, creates a new file otherwise.");
-
- protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
- .name("Conflict Resolution Strategy")
- .description("Indicates what should happen when a file with the same name already exists in the output directory")
- .required(true)
- .defaultValue(FAIL_RESOLUTION_AV.getValue())
- .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
- .build();
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
- final FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final FileSystem hdfs = getFileSystem();
- final Configuration configuration = getConfiguration();
- final UserGroupInformation ugi = getUserGroupInformation();
-
- if (configuration == null || hdfs == null || ugi == null) {
- getLogger().error("HDFS not configured properly");
- session.transfer(flowFile, getFailureRelationship());
- context.yield();
- return;
- }
-
- ugi.doAs(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- Path tempDotCopyFile = null;
- FlowFile putFlowFile = flowFile;
- try {
- final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
-
- final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
- final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
- final int bufferSize = getBufferSize(context, session, putFlowFile);
- final short replication = getReplication(context, session, putFlowFile, dirPath);
-
- final CompressionCodec codec = getCompressionCodec(context, configuration);
-
- final String filename = codec != null
- ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
- : putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
-
- final Path tempCopyFile = new Path(dirPath, "." + filename);
- final Path copyFile = new Path(dirPath, filename);
-
- // Create destination directory if it does not exist
- try {
- if (!hdfs.getFileStatus(dirPath).isDirectory()) {
- throw new IOException(dirPath.toString() + " already exists and is not a directory");
- }
- } catch (FileNotFoundException fe) {
- if (!hdfs.mkdirs(dirPath)) {
- throw new IOException(dirPath.toString() + " could not be created");
- }
- changeOwner(context, hdfs, dirPath, flowFile);
- }
-
- final boolean destinationExists = hdfs.exists(copyFile);
-
- // If destination file already exists, resolve that based on processor configuration
- if (destinationExists) {
- switch (conflictResponse) {
- case REPLACE_RESOLUTION:
- if (hdfs.delete(copyFile, false)) {
- getLogger().info("deleted {} in order to replace with the contents of {}",
- new Object[]{copyFile, putFlowFile});
- }
- break;
- case IGNORE_RESOLUTION:
- session.transfer(putFlowFile, getSuccessRelationship());
- getLogger().info("transferring {} to success because file with same name already exists",
- new Object[]{putFlowFile});
- return null;
- case FAIL_RESOLUTION:
- session.transfer(session.penalize(putFlowFile), getFailureRelationship());
- getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
- new Object[]{putFlowFile});
- return null;
- default:
- break;
- }
- }
-
- // Write FlowFile to temp file on HDFS
- final StopWatch stopWatch = new StopWatch(true);
- session.read(putFlowFile, new InputStreamCallback() {
-
- @Override
- public void process(InputStream in) throws IOException {
- OutputStream fos = null;
- Path createdFile = null;
- try {
- if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) {
- fos = hdfs.append(copyFile, bufferSize);
- } else {
- final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
- if (shouldIgnoreLocality(context, session)) {
- cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
- }
-
- fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
- FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
- null, null);
- }
-
- if (codec != null) {
- fos = codec.createOutputStream(fos);
- }
- createdFile = tempCopyFile;
- BufferedInputStream bis = new BufferedInputStream(in);
- StreamUtils.copy(bis, fos);
- bis = null;
- fos.flush();
- } finally {
- try {
- if (fos != null) {
- fos.close();
- }
- } catch (Throwable t) {
- // when talking to remote HDFS clusters, we don't notice problems until fos.close()
- if (createdFile != null) {
- try {
- hdfs.delete(createdFile, false);
- } catch (Throwable ignore) {
- }
- }
- throw t;
- }
- fos = null;
- }
- }
-
- });
- stopWatch.stop();
- final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
- final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
- tempDotCopyFile = tempCopyFile;
-
- if (!conflictResponse.equals(APPEND_RESOLUTION)
- || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) {
- boolean renamed = false;
- for (int i = 0; i < 10; i++) { // try to rename multiple times.
- if (hdfs.rename(tempCopyFile, copyFile)) {
- renamed = true;
- break;// rename was successful
- }
- Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
- }
- if (!renamed) {
- hdfs.delete(tempCopyFile, false);
- throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
- + " to its final filename");
- }
-
- changeOwner(context, hdfs, copyFile, flowFile);
- }
-
- getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
- new Object[]{putFlowFile, copyFile, millis, dataRate});
-
- final String newFilename = copyFile.getName();
- final String hdfsPath = copyFile.getParent().toString();
- putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
- putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
- final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
- session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
-
- session.transfer(putFlowFile, getSuccessRelationship());
-
- } catch (final IOException e) {
- Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
- if (causeOptional.isPresent()) {
- getLogger().warn("An error occurred while connecting to HDFS. "
- + "Rolling back session, and penalizing flow file {}",
- new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
- session.rollback(true);
- } else {
- getLogger().error("Failed to access HDFS due to {}", new Object[]{e});
- session.transfer(putFlowFile, getFailureRelationship());
- }
- } catch (final Throwable t) {
- if (tempDotCopyFile != null) {
- try {
- hdfs.delete(tempDotCopyFile, false);
- } catch (Exception e) {
- getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
- }
- }
- getLogger().error("Failed to write to HDFS due to {}", new Object[]{t});
- session.transfer(session.penalize(putFlowFile), getFailureRelationship());
- context.yield();
- }
-
- return null;
- }
- });
- }
-
- /**
- * Returns with the expected block size.
- */
- protected abstract long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path dirPath);
-
- /**
- * Returns with the expected buffer size.
- */
- protected abstract int getBufferSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
-
- /**
- * Returns with the expected replication factor.
- */
- protected abstract short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path dirPath);
-
- /**
- * Returns if file system should ignore locality.
- */
- protected abstract boolean shouldIgnoreLocality(final ProcessContext context, final ProcessSession session);
-
- /**
- * If returns a non-null value, the uploaded file's owner will be changed to this value after it is written. This only
- * works if NiFi is running as a user that has privilege to change owner.
- */
- protected abstract String getOwner(final ProcessContext context, final FlowFile flowFile);
-
- /**
- * I returns a non-null value, thee uploaded file's group will be changed to this value after it is written. This only
- * works if NiFi is running as a user that has privilege to change group.
- */
- protected abstract String getGroup(final ProcessContext context, final FlowFile flowFile);
-
- /**
- * @return The relationship the flow file will be transferred in case of successful execution.
- */
- protected abstract Relationship getSuccessRelationship();
-
- /**
- * @return The relationship the flow file will be transferred in case of failed execution.
- */
- protected abstract Relationship getFailureRelationship();
-
- /**
- * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
- * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
- * @param t The throwable to inspect for the cause.
- * @return
- */
- private <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
- Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
- return causalChain
- .filter(expectedCauseType::isInstance)
- .map(expectedCauseType::cast)
- .filter(causePredicate)
- .findFirst();
- }
-
- protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) {
- try {
- // Change owner and group of file if configured to do so
- final String owner = getOwner(context, flowFile);
- final String group = getGroup(context, flowFile);
-
- if (owner != null || group != null) {
- hdfs.setOwner(name, owner, group);
- }
- } catch (Exception e) {
- getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e});
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index 9296507..2c9285f 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -144,9 +144,7 @@
// We need a FlowFile to report provenance correctly.
final FlowFile finalFlowFile = originalFlowFile != null ? originalFlowFile : session.create();
-
- final String fileOrDirectoryName = getNormalizedPath(context, FILE_OR_DIRECTORY, finalFlowFile).toString();
-
+ final String fileOrDirectoryName = getPath(context, session, finalFlowFile);
final FileSystem fileSystem = getFileSystem();
final UserGroupInformation ugi = getUserGroupInformation();
@@ -171,11 +169,11 @@
if (fileSystem.exists(path)) {
try {
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(2);
- attributes.put("hdfs.filename", path.getName());
- attributes.put("hdfs.path", path.getParent().toString());
+ attributes.put(getAttributePrefix() + ".filename", path.getName());
+ attributes.put(getAttributePrefix() + ".path", path.getParent().toString());
flowFile = session.putAllAttributes(flowFile, attributes);
- fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean());
+ fileSystem.delete(path, isRecursive(context, session));
getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
@@ -186,27 +184,47 @@
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1);
// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
- attributes.put("hdfs.error.message", ioe.getMessage());
+ attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage());
- session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), REL_FAILURE);
+ session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship());
failedPath++;
}
}
}
if (failedPath == 0) {
- session.transfer(flowFile, DeleteHDFS.REL_SUCCESS);
+ session.transfer(flowFile, getSuccessRelationship());
} else {
// If any path has been failed to be deleted, remove the FlowFile as it's been cloned and sent to failure.
session.remove(flowFile);
}
} catch (IOException e) {
getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
- session.transfer(flowFile, DeleteHDFS.REL_FAILURE);
+ session.transfer(flowFile, getFailureRelationship());
}
return null;
});
}
+
+ protected Relationship getSuccessRelationship() {
+ return REL_SUCCESS;
+ }
+
+ protected Relationship getFailureRelationship() {
+ return REL_FAILURE;
+ }
+
+ protected boolean isRecursive(final ProcessContext context, final ProcessSession session) {
+ return context.getProperty(RECURSIVE).asBoolean();
+ }
+
+ protected String getPath(final ProcessContext context, final ProcessSession session, final FlowFile finalFlowFile) {
+ return getNormalizedPath(context, FILE_OR_DIRECTORY, finalFlowFile).toString();
+ }
+
+ protected String getAttributePrefix() {
+ return "hdfs";
+ }
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index b60ee5b..e4acaac 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -17,7 +17,7 @@
package org.apache.nifi.processors.hadoop;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -121,16 +121,16 @@
final FileSystem hdfs = getFileSystem();
final UserGroupInformation ugi = getUserGroupInformation();
- final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String filenameValue = getPath(context, flowFile);
final Path path;
try {
- path = getNormalizedPath(context, FILENAME, flowFile);
+ path = getNormalizedPath(getPath(context, flowFile));
} catch (IllegalArgumentException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, flowFile, e});
- flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
+ flowFile = session.putAttribute(flowFile, getAttributePrefix() + ".failure.reason", e.getMessage());
flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
+ session.transfer(flowFile, getFailureRelationship());
return;
}
@@ -144,7 +144,7 @@
CompressionCodec codec = null;
Configuration conf = getConfiguration();
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
- final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
+ final CompressionType compressionType = getCompressionType(context);
final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
if(inferCompressionCodec) {
@@ -174,16 +174,16 @@
stopWatch.stop();
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()});
session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
- session.transfer(flowFile, REL_SUCCESS);
+ session.transfer(flowFile, getSuccessRelationship());
} catch (final FileNotFoundException | AccessControlException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e});
- flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
+ flowFile = session.putAttribute(flowFile, getAttributePrefix() + ".failure.reason", e.getMessage());
flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
+ session.transfer(flowFile, getFailureRelationship());
} catch (final IOException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile, e});
flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_COMMS_FAILURE);
+ session.transfer(flowFile, getCommsFailureRelationship());
} finally {
IOUtils.closeQuietly(stream);
}
@@ -191,7 +191,29 @@
return null;
}
});
-
}
+ protected Relationship getSuccessRelationship() {
+ return REL_SUCCESS;
+ }
+
+ protected Relationship getFailureRelationship() {
+ return REL_FAILURE;
+ }
+
+ protected Relationship getCommsFailureRelationship() {
+ return REL_COMMS_FAILURE;
+ }
+
+ protected String getPath(final ProcessContext context, final FlowFile flowFile) {
+ return context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
+ }
+
+ protected String getAttributePrefix() {
+ return "hdfs";
+ }
+
+ protected CompressionType getCompressionType(final ProcessContext context) {
+ return CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
+ }
}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 583b0b8..907a141 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -503,7 +503,7 @@
final Map<String, String> attributes = createAttributes(status);
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, REL_SUCCESS);
+ session.transfer(flowFile, getSuccessRelationship());
}
}
@@ -528,7 +528,7 @@
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, REL_SUCCESS);
+ session.transfer(flowFile, getSuccessRelationship());
}
private Record createRecord(final FileStatus fileStatus) {
@@ -620,15 +620,15 @@
attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
- attributes.put("hdfs.owner", status.getOwner());
- attributes.put("hdfs.group", status.getGroup());
- attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime()));
- attributes.put("hdfs.length", String.valueOf(status.getLen()));
- attributes.put("hdfs.replication", String.valueOf(status.getReplication()));
+ attributes.put(getAttributePrefix() + ".owner", status.getOwner());
+ attributes.put(getAttributePrefix() + ".group", status.getGroup());
+ attributes.put(getAttributePrefix() + ".lastModified", String.valueOf(status.getModificationTime()));
+ attributes.put(getAttributePrefix() + ".length", String.valueOf(status.getLen()));
+ attributes.put(getAttributePrefix() + ".replication", String.valueOf(status.getReplication()));
final FsPermission permission = status.getPermission();
final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
- attributes.put("hdfs.permissions", perms);
+ attributes.put(getAttributePrefix() + ".permissions", perms);
return attributes;
}
@@ -669,4 +669,11 @@
};
}
+ protected Relationship getSuccessRelationship() {
+ return REL_SUCCESS;
+ }
+
+ protected String getAttributePrefix() {
+ return "hdfs";
+ }
}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 00942e3..29327ec 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -16,9 +16,15 @@
*/
package org.apache.nifi.processors.hadoop;
+import com.google.common.base.Throwables;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -29,22 +35,40 @@
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StopWatch;
+import org.ietf.jgss.GSSException;
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
/**
* This processor copies FlowFiles to HDFS.
@@ -63,7 +87,11 @@
requiredPermission = RequiredPermission.WRITE_DISTRIBUTED_FILESYSTEM,
explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")
})
-public class PutHDFS extends AbstractPutHDFS {
+public class PutHDFS extends AbstractHadoopProcessor {
+
+ protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
+ protected static final int BUFFER_SIZE_DEFAULT = 4096;
+
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -78,6 +106,28 @@
// properties
+ protected static final String REPLACE_RESOLUTION = "replace";
+ protected static final String IGNORE_RESOLUTION = "ignore";
+ protected static final String FAIL_RESOLUTION = "fail";
+ protected static final String APPEND_RESOLUTION = "append";
+
+ protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
+ REPLACE_RESOLUTION, "Replaces the existing file if any.");
+ protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
+ "Ignores the flow file and routes it to success.");
+ protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
+ "Penalizes the flow file and routes it to failure.");
+ protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
+ "Appends to the existing file if any, creates a new file otherwise.");
+
+ protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+ .name("Conflict Resolution Strategy")
+ .description("Indicates what should happen when a file with the same name already exists in the output directory")
+ .required(true)
+ .defaultValue(FAIL_RESOLUTION_AV.getValue())
+ .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
+ .build();
+
public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
.name("Block Size")
.description("Size of each block as written to HDFS. This overrides the Hadoop Configuration")
@@ -179,48 +229,263 @@
}
@Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ final FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final FileSystem hdfs = getFileSystem();
+ final Configuration configuration = getConfiguration();
+ final UserGroupInformation ugi = getUserGroupInformation();
+
+ if (configuration == null || hdfs == null || ugi == null) {
+ getLogger().error("HDFS not configured properly");
+ session.transfer(flowFile, getFailureRelationship());
+ context.yield();
+ return;
+ }
+
+ ugi.doAs(new PrivilegedAction<Object>() {
+ @Override
+ public Object run() {
+ Path tempDotCopyFile = null;
+ FlowFile putFlowFile = flowFile;
+ try {
+ final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
+
+ final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
+ final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
+ final int bufferSize = getBufferSize(context, session, putFlowFile);
+ final short replication = getReplication(context, session, putFlowFile, dirPath);
+
+ final CompressionCodec codec = getCompressionCodec(context, configuration);
+
+ final String filename = codec != null
+ ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
+ : putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+ final Path tempCopyFile = new Path(dirPath, "." + filename);
+ final Path copyFile = new Path(dirPath, filename);
+
+ // Create destination directory if it does not exist
+ try {
+ if (!hdfs.getFileStatus(dirPath).isDirectory()) {
+ throw new IOException(dirPath.toString() + " already exists and is not a directory");
+ }
+ } catch (FileNotFoundException fe) {
+ if (!hdfs.mkdirs(dirPath)) {
+ throw new IOException(dirPath.toString() + " could not be created");
+ }
+ changeOwner(context, hdfs, dirPath, flowFile);
+ }
+
+ final boolean destinationExists = hdfs.exists(copyFile);
+
+ // If destination file already exists, resolve that based on processor configuration
+ if (destinationExists) {
+ switch (conflictResponse) {
+ case REPLACE_RESOLUTION:
+ if (hdfs.delete(copyFile, false)) {
+ getLogger().info("deleted {} in order to replace with the contents of {}",
+ new Object[]{copyFile, putFlowFile});
+ }
+ break;
+ case IGNORE_RESOLUTION:
+ session.transfer(putFlowFile, getSuccessRelationship());
+ getLogger().info("transferring {} to success because file with same name already exists",
+ new Object[]{putFlowFile});
+ return null;
+ case FAIL_RESOLUTION:
+ session.transfer(session.penalize(putFlowFile), getFailureRelationship());
+ getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
+ new Object[]{putFlowFile});
+ return null;
+ default:
+ break;
+ }
+ }
+
+ // Write FlowFile to temp file on HDFS
+ final StopWatch stopWatch = new StopWatch(true);
+ session.read(putFlowFile, new InputStreamCallback() {
+
+ @Override
+ public void process(InputStream in) throws IOException {
+ OutputStream fos = null;
+ Path createdFile = null;
+ try {
+ if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) {
+ fos = hdfs.append(copyFile, bufferSize);
+ } else {
+ final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
+
+ if (shouldIgnoreLocality(context, session)) {
+ cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
+ }
+
+ fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+ FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
+ null, null);
+ }
+
+ if (codec != null) {
+ fos = codec.createOutputStream(fos);
+ }
+ createdFile = tempCopyFile;
+ BufferedInputStream bis = new BufferedInputStream(in);
+ StreamUtils.copy(bis, fos);
+ bis = null;
+ fos.flush();
+ } finally {
+ try {
+ if (fos != null) {
+ fos.close();
+ }
+ } catch (Throwable t) {
+ // when talking to remote HDFS clusters, we don't notice problems until fos.close()
+ if (createdFile != null) {
+ try {
+ hdfs.delete(createdFile, false);
+ } catch (Throwable ignore) {
+ }
+ }
+ throw t;
+ }
+ fos = null;
+ }
+ }
+
+ });
+ stopWatch.stop();
+ final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
+ final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ tempDotCopyFile = tempCopyFile;
+
+ if (!conflictResponse.equals(APPEND_RESOLUTION)
+ || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) {
+ boolean renamed = false;
+ for (int i = 0; i < 10; i++) { // try to rename multiple times.
+ if (hdfs.rename(tempCopyFile, copyFile)) {
+ renamed = true;
+ break;// rename was successful
+ }
+ Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
+ }
+ if (!renamed) {
+ hdfs.delete(tempCopyFile, false);
+ throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
+ + " to its final filename");
+ }
+
+ changeOwner(context, hdfs, copyFile, flowFile);
+ }
+
+ getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
+ new Object[]{putFlowFile, copyFile, millis, dataRate});
+
+ final String newFilename = copyFile.getName();
+ final String hdfsPath = copyFile.getParent().toString();
+ putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
+ putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+ final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
+ session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
+
+ session.transfer(putFlowFile, getSuccessRelationship());
+
+ } catch (final IOException e) {
+ Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
+ if (causeOptional.isPresent()) {
+ getLogger().warn("An error occurred while connecting to HDFS. "
+ + "Rolling back session, and penalizing flow file {}",
+ new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
+ session.rollback(true);
+ } else {
+ getLogger().error("Failed to access HDFS due to {}", new Object[]{e});
+ session.transfer(putFlowFile, getFailureRelationship());
+ }
+ } catch (final Throwable t) {
+ if (tempDotCopyFile != null) {
+ try {
+ hdfs.delete(tempDotCopyFile, false);
+ } catch (Exception e) {
+ getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
+ }
+ }
+ getLogger().error("Failed to write to HDFS due to {}", new Object[]{t});
+ session.transfer(session.penalize(putFlowFile), getFailureRelationship());
+ context.yield();
+ }
+
+ return null;
+ }
+ });
+ }
+
protected Relationship getSuccessRelationship() {
return REL_SUCCESS;
}
- @Override
protected Relationship getFailureRelationship() {
return REL_FAILURE;
}
- @Override
protected long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) {
final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
return blockSizeProp != null ? blockSizeProp.longValue() : getFileSystem().getDefaultBlockSize(dirPath);
}
- @Override
protected int getBufferSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
return bufferSizeProp != null ? bufferSizeProp.intValue() : getConfiguration().getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
}
- @Override
protected short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) {
final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
return replicationProp != null ? replicationProp.shortValue() : getFileSystem()
.getDefaultReplication(dirPath);
}
- @Override
protected boolean shouldIgnoreLocality(final ProcessContext context, final ProcessSession session) {
return context.getProperty(IGNORE_LOCALITY).asBoolean();
}
- @Override
protected String getOwner(final ProcessContext context, final FlowFile flowFile) {
final String owner = context.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
return owner == null || owner.isEmpty() ? null : owner;
}
- @Override
protected String getGroup(final ProcessContext context, final FlowFile flowFile) {
final String group = context.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
return group == null || group.isEmpty() ? null : group;
}
+
+ /**
+ * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
+ * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
+ * @param t The throwable to inspect for the cause.
+ * @return
+ */
+ private <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
+ Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
+ return causalChain
+ .filter(expectedCauseType::isInstance)
+ .map(expectedCauseType::cast)
+ .filter(causePredicate)
+ .findFirst();
+ }
+
+ protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) {
+ try {
+ // Change owner and group of file if configured to do so
+ final String owner = getOwner(context, flowFile);
+ final String group = getGroup(context, flowFile);
+
+ if (owner != null || group != null) {
+ hdfs.setOwner(name, owner, group);
+ }
+ } catch (Exception e) {
+ getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e});
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
index 0cd069e..342c71b 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
@@ -46,6 +46,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
@@ -124,7 +125,8 @@
final File brokenCoreSite = new File("src/test/resources/core-site-broken.xml");
final ResourceReference brokenCoreSiteReference = new FileResourceReference(brokenCoreSite);
final ResourceReferences references = new StandardResourceReferences(Collections.singletonList(brokenCoreSiteReference));
- processor.resetHDFSResources(references, runner.getProcessContext());
+ final List<String> locations = references.asLocations();
+ processor.resetHDFSResources(locations, runner.getProcessContext());
Assert.fail("Should have thrown SocketTimeoutException");
} catch (IOException e) {
}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
index ab49d63..b7920b8 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
@@ -21,7 +21,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
@@ -35,6 +34,7 @@
import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.List;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
@@ -96,7 +96,7 @@
public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile {
@Override
- HdfsResources resetHDFSResources(ResourceReferences configResources, ProcessContext context) throws IOException {
+ HdfsResources resetHDFSResources(final List<String> resourceLocations, ProcessContext context) throws IOException {
return hdfsResources;
}