blob: e627250886ed4c2435f4bc101cb4e118ecba1d53 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.file.transfer.PutFileTransfer;
import org.apache.nifi.processors.standard.util.FTPTransfer;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"remote", "copy", "egress", "put", "ftp", "archive", "files"})
@CapabilityDescription("Sends FlowFiles to an FTP Server")
@SeeAlso(GetFTP.class)
@DynamicProperties({
@DynamicProperty(name = "pre.cmd._____", value = "Not used", description = "The command specified in the key will be executed before doing a put. You may add these optional properties "
+ " to send any commands to the FTP server before the file is actually transferred (before the put command)."
+ " This option is only available for the PutFTP processor, as only FTP has this functionality. This is"
+ " essentially the same as sending quote commands to an FTP server from the command line. While this is the same as sending a quote command, it is very important that"
+ " you leave off the ."),
@DynamicProperty(name = "post.cmd._____", value = "Not used", description = "The command specified in the key will be executed after doing a put. You may add these optional properties "
+ " to send any commands to the FTP server before the file is actually transferred (before the put command)."
+ " This option is only available for the PutFTP processor, as only FTP has this functionality. This is"
+ " essentially the same as sending quote commands to an FTP server from the command line. While this is the same as sending a quote command, it is very important that"
+ " you leave off the .")})
public class PutFTP extends PutFileTransfer<FTPTransfer> {
private static final Pattern PRE_SEND_CMD_PATTERN = Pattern.compile("^pre\\.cmd\\.(\\d+)$");
private static final Pattern POST_SEND_CMD_PATTERN = Pattern.compile("^post\\.cmd\\.(\\d+)$");
private final AtomicReference<List<PropertyDescriptor>> preSendDescriptorRef = new AtomicReference<>();
private final AtomicReference<List<PropertyDescriptor>> postSendDescriptorRef = new AtomicReference<>();
private List<PropertyDescriptor> properties;
// PutFileTransfer.onTrigger() uses FlowFile attributes
public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(FTPTransfer.REMOTE_PATH)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(FTPTransfer.HOSTNAME);
properties.add(FTPTransfer.PORT);
properties.add(FTPTransfer.USERNAME);
properties.add(FTPTransfer.PASSWORD);
properties.add(REMOTE_PATH);
properties.add(FTPTransfer.CREATE_DIRECTORY);
properties.add(FTPTransfer.BATCH_SIZE);
properties.add(FTPTransfer.CONNECTION_TIMEOUT);
properties.add(FTPTransfer.DATA_TIMEOUT);
properties.add(FTPTransfer.CONFLICT_RESOLUTION);
properties.add(FTPTransfer.DOT_RENAME);
properties.add(FTPTransfer.TEMP_FILENAME);
properties.add(FTPTransfer.TRANSFER_MODE);
properties.add(FTPTransfer.CONNECTION_MODE);
properties.add(FTPTransfer.REJECT_ZERO_BYTE);
properties.add(FTPTransfer.LAST_MODIFIED_TIME);
properties.add(FTPTransfer.PERMISSIONS);
properties.add(FTPTransfer.USE_COMPRESSION);
properties.add(FTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
properties.add(FTPTransfer.BUFFER_SIZE);
properties.add(FTPTransfer.UTF8_ENCODING);
this.properties = Collections.unmodifiableList(properties);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected void beforePut(final FlowFile flowFile, final ProcessContext context, final FTPTransfer transfer) throws IOException {
transfer.sendCommands(getCommands(preSendDescriptorRef.get(), context, flowFile), flowFile);
}
@Override
protected void afterPut(final FlowFile flowFile, final ProcessContext context, final FTPTransfer transfer) throws IOException {
transfer.sendCommands(getCommands(postSendDescriptorRef.get(), context, flowFile), flowFile);
}
@Override
protected FTPTransfer getFileTransfer(final ProcessContext context) {
return new FTPTransfer(context, getLogger());
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true)
.build();
}
@OnScheduled
public void determinePrePostSendProperties(final ProcessContext context) {
final Map<Integer, PropertyDescriptor> preDescriptors = new TreeMap<>();
final Map<Integer, PropertyDescriptor> postDescriptors = new TreeMap<>();
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
final String name = descriptor.getName();
final Matcher preMatcher = PRE_SEND_CMD_PATTERN.matcher(name);
if (preMatcher.matches()) {
final int index = Integer.parseInt(preMatcher.group(1));
preDescriptors.put(index, descriptor);
} else {
final Matcher postMatcher = POST_SEND_CMD_PATTERN.matcher(name);
if (postMatcher.matches()) {
final int index = Integer.parseInt(postMatcher.group(1));
postDescriptors.put(index, descriptor);
}
}
}
final List<PropertyDescriptor> preDescriptorList = new ArrayList<>(preDescriptors.values());
final List<PropertyDescriptor> postDescriptorList = new ArrayList<>(postDescriptors.values());
this.preSendDescriptorRef.set(preDescriptorList);
this.postSendDescriptorRef.set(postDescriptorList);
}
private List<String> getCommands(final List<PropertyDescriptor> descriptors, final ProcessContext context, final FlowFile flowFile) {
final List<String> cmds = new ArrayList<>();
for (final PropertyDescriptor descriptor : descriptors) {
cmds.add(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
}
return cmds;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
FTPTransfer.validateProxySpec(validationContext, results);
return results;
}
}