* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.exception.FailureException;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
* Base class for processors that write Records to HDFS.
@TriggerWhenEmpty // trigger when empty so we have a chance to perform a Kerberos re-login
@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield since we are triggering when empty
public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
.displayName("Compression Type")
.description("The type of compression for the file being written.")
public static final PropertyDescriptor OVERWRITE = new PropertyDescriptor.Builder()
.displayName("Overwrite Files")
.description("Whether or not to overwrite existing files in the same directory with the same name. When set to false, " +
"flow files will be routed to failure when a file exists in the same directory with the same name.")
.allowableValues("true", "false")
public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder()
.displayName("Permissions umask")
.description("A umask represented as an octal number which determines the permissions of files written to HDFS. " +
"This overrides the Hadoop Configuration dfs.umaskmode")
public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
.displayName("Remote Owner")
.description("Changes the owner of the HDFS file to this value after it is written. " +
"This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
.displayName("Remote Group")
.description("Changes the group of the HDFS file to this value after it is written. " +
"This only works if NiFi is running as a user that has HDFS super user privilege to change group")
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.displayName("Record Reader")
.description("The service for reading records from incoming flow files.")
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("Flow Files that have been successfully processed are transferred to this relationship")
public static final Relationship REL_RETRY = new Relationship.Builder()
.description("Flow Files that could not be processed due to issues that can be retried are transferred to this relationship")
public static final Relationship REL_FAILURE = new Relationship.Builder()
.description("Flow Files that could not be processed due to issue that cannot be retried are transferred to this relationship")
public static final String RECORD_COUNT_ATTR = "record.count";
private volatile String remoteOwner;
private volatile String remoteGroup;
private volatile Set<Relationship> putHdfsRecordRelationships;
private volatile List<PropertyDescriptor> putHdfsRecordProperties;
protected final void init(final ProcessorInitializationContext context) {
final Set<Relationship> rels = new HashSet<>();
this.putHdfsRecordRelationships = Collections.unmodifiableSet(rels);
final List<PropertyDescriptor> props = new ArrayList<>(properties);
props.add(new PropertyDescriptor.Builder()
.description("The parent directory to which files should be written. Will be created if it doesn't exist.")
final AllowableValue[] compressionTypes = getCompressionTypes(context).toArray(new AllowableValue[0]);
props.add(new PropertyDescriptor.Builder()
this.putHdfsRecordProperties = Collections.unmodifiableList(props);
* @param context the initialization context
* @return the possible compression types
public abstract List<AllowableValue> getCompressionTypes(final ProcessorInitializationContext context);
* @param context the initialization context
* @return the default compression type
public abstract String getDefaultCompressionType(final ProcessorInitializationContext context);
* Allows sub-classes to add additional properties, called from initialize.
* @return additional properties to add to the overall list
public List<PropertyDescriptor> getAdditionalProperties() {
return Collections.emptyList();
public final Set<Relationship> getRelationships() {
return putHdfsRecordRelationships;
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return putHdfsRecordProperties;
protected void preProcessConfiguration(Configuration config, ProcessContext context) {
// Set umask once, to avoid thread safety issues doing it in onTrigger
final PropertyValue umaskProp = context.getProperty(UMASK);
final short dfsUmask;
if (umaskProp.isSet()) {
dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
} else {
dfsUmask = FsPermission.DEFAULT_UMASK;
FsPermission.setUMask(config, new FsPermission(dfsUmask));
public final void onScheduled(final ProcessContext context) throws IOException {
this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
* Sub-classes provide the appropriate HDFSRecordWriter.
* @param context the process context to obtain additional configuration
* @param flowFile the flow file being written
* @param conf the Configuration instance
* @param path the path to write to
* @param schema the schema for writing
* @return the HDFSRecordWriter
* @throws IOException if an error occurs creating the writer or processing the schema
public abstract HDFSRecordWriter createHDFSRecordWriter(
final ProcessContext context,
final FlowFile flowFile,
final Configuration conf,
final Path path,
final RecordSchema schema) throws IOException, SchemaNotFoundException;
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// do this before getting a flow file so that we always get a chance to attempt Kerberos relogin
final FileSystem fileSystem = getFileSystem();
final Configuration configuration = getConfiguration();
final UserGroupInformation ugi = getUserGroupInformation();
if (configuration == null || fileSystem == null || ugi == null) {
getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
final FlowFile flowFile = session.get();
if (flowFile == null) {
ugi.doAs((PrivilegedAction<Object>)() -> {
Path tempDotCopyFile = null;
FlowFile putFlowFile = flowFile;
try {
final String filenameValue = putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension
final String directoryValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
// create the directory if it doesn't exist
final Path directoryPath = new Path(directoryValue);
createDirectory(fileSystem, directoryPath, remoteOwner, remoteGroup);
// write to tempFile first and on success rename to destFile
final Path tempFile = new Path(directoryPath, "." + filenameValue);
final Path destFile = new Path(directoryPath, filenameValue);
final boolean destinationOrTempExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean();
// if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure
if (destinationOrTempExists && !shouldOverwrite) {
session.transfer(session.penalize(putFlowFile), REL_FAILURE);
getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
return null;
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);
final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final FlowFile flowFileIn = putFlowFile;
final StopWatch stopWatch = new StopWatch(true);
// Read records from the incoming FlowFile and write them the tempFile, (final InputStream in) -> {
RecordReader recordReader = null;
HDFSRecordWriter recordWriter = null;
try {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
try {
recordReader = recordReaderFactory.createRecordReader(flowFileIn, in, getLogger());
} catch (Exception e) {
final RecordReaderFactoryException rrfe = new RecordReaderFactoryException("Unable to create RecordReader", e);
final RecordSet recordSet = recordReader.createRecordSet();
recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, recordReader.getSchema());
} catch (Exception e) {
} finally {
final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
tempDotCopyFile = tempFile;
// if any errors happened within the then throw the exception so we jump
// into one of the appropriate catch blocks below
if (exceptionHolder.get() != null) {
throw exceptionHolder.get();
final boolean destinationExists = fileSystem.exists(destFile);
// If destination file already exists, resolve that based on processor configuration
if (destinationExists && shouldOverwrite) {
if (fileSystem.delete(destFile, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[]{destFile, putFlowFile});
// Attempt to rename from the tempFile to destFile, and change owner if successfully renamed
rename(fileSystem, tempFile, destFile);
changeOwner(fileSystem, destFile, remoteOwner, remoteGroup);
getLogger().info("Wrote {} to {} in {} milliseconds at a rate of {}", new Object[]{putFlowFile, destFile, millis, dataRate});
putFlowFile = postProcess(context, session, putFlowFile, destFile);
final String newFilename = destFile.getName();
final String hdfsPath = destFile.getParent().toString();
// Update the filename and absolute path attributes
final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes());
attributes.put(CoreAttributes.FILENAME.key(), newFilename);
attributes.put(ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount()));
putFlowFile = session.putAllAttributes(putFlowFile, attributes);
// Send a provenance event and transfer to success
final Path qualifiedPath = destFile.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
session.transfer(putFlowFile, REL_SUCCESS);
} catch (IOException | FlowFileAccessException e) {
deleteQuietly(fileSystem, tempDotCopyFile);
getLogger().error("Failed to write due to {}", new Object[]{e});
session.transfer(session.penalize(putFlowFile), REL_RETRY);
} catch (Throwable t) {
deleteQuietly(fileSystem, tempDotCopyFile);
getLogger().error("Failed to write due to {}", new Object[]{t});
session.transfer(putFlowFile, REL_FAILURE);
return null;
* This method will be called after successfully writing to the destination file and renaming the file to it's final name
* in order to give sub-classes a chance to take action before transferring to success.
* @param context the context
* @param session the session
* @param flowFile the flow file being processed
* @param destFile the destination file written to
* @return an updated FlowFile reference
protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) {
return flowFile;
* Attempts to rename srcFile to destFile up to 10 times, with a 200ms sleep in between each attempt.
* If the file has not been renamed after 10 attempts, a FailureException is thrown.
* @param fileSystem the file system where the files are located
* @param srcFile the source file
* @param destFile the destination file to rename the source to
* @throws IOException if IOException happens while attempting to rename
* @throws InterruptedException if renaming is interrupted
* @throws FailureException if the file couldn't be renamed after 10 attempts
protected void rename(final FileSystem fileSystem, final Path srcFile, final Path destFile) throws IOException, InterruptedException, FailureException {
boolean renamed = false;
for (int i = 0; i < 10; i++) { // try to rename multiple times.
if (fileSystem.rename(srcFile, destFile)) {
renamed = true;
break;// rename was successful
Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
if (!renamed) {
fileSystem.delete(srcFile, false);
throw new FailureException("Could not rename file " + srcFile + " to its final filename");
* Deletes the given file from the given filesystem. Any exceptions that are encountered will be caught and logged, but not thrown.
* @param fileSystem the filesystem where the file exists
* @param file the file to delete
protected void deleteQuietly(final FileSystem fileSystem, final Path file) {
if (file != null) {
try {
fileSystem.delete(file, false);
} catch (Exception e) {
getLogger().error("Unable to remove file {} due to {}", new Object[]{file, e});
* Changes the ownership of the given file.
* @param fileSystem the filesystem where the file exists
* @param path the file to change ownership on
* @param remoteOwner the new owner for the file
* @param remoteGroup the new group for the file
protected void changeOwner(final FileSystem fileSystem, final Path path, final String remoteOwner, final String remoteGroup) {
try {
// Change owner and group of file if configured to do so
if (remoteOwner != null || remoteGroup != null) {
fileSystem.setOwner(path, remoteOwner, remoteGroup);
} catch (Exception e) {
getLogger().warn("Could not change owner or group of {} on due to {}", new Object[]{path, e});
* Creates the given directory and changes the ownership to the specified owner/group.
* @param fileSystem the filesystem to create the directory on
* @param directory the directory to create
* @param remoteOwner the owner for changing ownership of the directory
* @param remoteGroup the group for changing ownership of the directory
* @throws IOException if an error occurs obtaining the file status or issuing the mkdir command
* @throws FailureException if the directory could not be created
protected void createDirectory(final FileSystem fileSystem, final Path directory, final String remoteOwner, final String remoteGroup) throws IOException, FailureException {
try {
if (!fileSystem.getFileStatus(directory).isDirectory()) {
throw new FailureException(directory.toString() + " already exists and is not a directory");
} catch (FileNotFoundException fe) {
if (!fileSystem.mkdirs(directory)) {
throw new FailureException(directory.toString() + " could not be created");
changeOwner(fileSystem, directory, remoteOwner, remoteGroup);