blob: 00942e328318f19ad65156e1b0af4bb7787230de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* This processor copies FlowFiles to HDFS.
*/
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "HCFS", "HDFS", "put", "copy", "filesystem"})
@CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)")
@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."),
@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.")
})
@SeeAlso(GetHDFS.class)
@Restricted(restrictions = {
@Restriction(
requiredPermission = RequiredPermission.WRITE_DISTRIBUTED_FILESYSTEM,
explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")
})
public class PutHDFS extends AbstractPutHDFS {
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Files that have been successfully written to HDFS are transferred to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Files that could not be written to HDFS for some reason are transferred to this relationship")
.build();
// properties
public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
.name("Block Size")
.description("Size of each block as written to HDFS. This overrides the Hadoop Configuration")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("IO Buffer Size")
.description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor REPLICATION_FACTOR = new PropertyDescriptor.Builder()
.name("Replication")
.description("Number of times that HDFS will replicate each file. This overrides the Hadoop Configuration")
.addValidator(HadoopValidators.POSITIVE_SHORT_VALIDATOR)
.build();
public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder()
.name("Permissions umask")
.description(
"A umask represented as an octal number which determines the permissions of files written to HDFS. " +
"This overrides the Hadoop property \"fs.permissions.umask-mode\". " +
"If this property and \"fs.permissions.umask-mode\" are undefined, the Hadoop default \"022\" will be used.")
.addValidator(HadoopValidators.UMASK_VALIDATOR)
.build();
public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
.name("Remote Owner")
.description(
"Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
.name("Remote Group")
.description(
"Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor IGNORE_LOCALITY = new PropertyDescriptor.Builder()
.name("Ignore Locality")
.displayName("Ignore Locality")
.description(
"Directs the HDFS system to ignore locality rules so that data is distributed randomly throughout the cluster")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
private static final Set<Relationship> relationships;
static {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> props = new ArrayList<>(properties);
props.add(new PropertyDescriptor.Builder()
.fromPropertyDescriptor(DIRECTORY)
.description("The parent HDFS directory to which files should be written. The directory will be created if it doesn't exist.")
.build());
props.add(CONFLICT_RESOLUTION);
props.add(BLOCK_SIZE);
props.add(BUFFER_SIZE);
props.add(REPLICATION_FACTOR);
props.add(UMASK);
props.add(REMOTE_OWNER);
props.add(REMOTE_GROUP);
props.add(COMPRESSION_CODEC);
props.add(IGNORE_LOCALITY);
return props;
}
@Override
protected void preProcessConfiguration(final Configuration config, final ProcessContext context) {
// Set umask once, to avoid thread safety issues doing it in onTrigger
final PropertyValue umaskProp = context.getProperty(UMASK);
final short dfsUmask;
if (umaskProp.isSet()) {
dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
} else {
dfsUmask = FsPermission.getUMask(config).toShort();
}
FsPermission.setUMask(config, new FsPermission(dfsUmask));
}
@Override
protected Relationship getSuccessRelationship() {
return REL_SUCCESS;
}
@Override
protected Relationship getFailureRelationship() {
return REL_FAILURE;
}
@Override
protected long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) {
final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
return blockSizeProp != null ? blockSizeProp.longValue() : getFileSystem().getDefaultBlockSize(dirPath);
}
@Override
protected int getBufferSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
return bufferSizeProp != null ? bufferSizeProp.intValue() : getConfiguration().getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
}
@Override
protected short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) {
final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
return replicationProp != null ? replicationProp.shortValue() : getFileSystem()
.getDefaultReplication(dirPath);
}
@Override
protected boolean shouldIgnoreLocality(final ProcessContext context, final ProcessSession session) {
return context.getProperty(IGNORE_LOCALITY).asBoolean();
}
@Override
protected String getOwner(final ProcessContext context, final FlowFile flowFile) {
final String owner = context.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
return owner == null || owner.isEmpty() ? null : owner;
}
@Override
protected String getGroup(final ProcessContext context, final FlowFile flowFile) {
final String group = context.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
return group == null || group.isEmpty() ? null : group;
}
}