blob: b729679f87d51092f27a58334ecfa6dbed071442 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Base class for PutFTP & PutSFTP
*
* @param <T> type of transfer
*/
public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are successfully sent will be routed to success")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to send to the remote system; failure is usually looped back to this processor")
.build();
public static final Relationship REL_REJECT = new Relationship.Builder()
.name("reject")
.description("FlowFiles that were rejected by the destination system")
.build();
private final Set<Relationship> relationships;
public PutFileTransfer() {
super();
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_REJECT);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
protected abstract T getFileTransfer(final ProcessContext context);
protected void beforePut(final FlowFile flowFile, final ProcessContext context, final T transfer) throws IOException {
}
protected void afterPut(final FlowFile flowFile, final ProcessContext context, final T transfer) throws IOException {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
final int maxNumberOfFiles = context.getProperty(FileTransfer.BATCH_SIZE).asInteger();
int fileCount = 0;
try (final T transfer = getFileTransfer(context)) {
do {
//check if hostname is regular expression requiring evaluation
if(context.getProperty(FileTransfer.HOSTNAME).isExpressionLanguagePresent()) {
hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
}
final String rootPath = context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue();
final String workingDirPath;
if (StringUtils.isBlank(rootPath)) {
workingDirPath = transfer.getHomeDirectory(flowFile);
} else {
workingDirPath = transfer.getAbsolutePath(flowFile, rootPath);
}
final boolean rejectZeroByteFiles = context.getProperty(FileTransfer.REJECT_ZERO_BYTE).asBoolean();
final ConflictResult conflictResult
= identifyAndResolveConflictFile(context.getProperty(FileTransfer.CONFLICT_RESOLUTION).getValue(), transfer, workingDirPath, flowFile, rejectZeroByteFiles, logger);
if (conflictResult.isTransfer()) {
final StopWatch stopWatch = new StopWatch();
stopWatch.start();
beforePut(flowFile, context, transfer);
final FlowFile flowFileToTransfer = flowFile;
final AtomicReference<String> fullPathRef = new AtomicReference<>(null);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final InputStream bufferedIn = new BufferedInputStream(in)) {
if (workingDirPath != null && context.getProperty(SFTPTransfer.CREATE_DIRECTORY).asBoolean()) {
transfer.ensureDirectoryExists(flowFileToTransfer, new File(workingDirPath));
}
fullPathRef.set(transfer.put(flowFileToTransfer, workingDirPath, conflictResult.getFileName(), bufferedIn));
}
}
});
afterPut(flowFile, context, transfer);
stopWatch.stop();
final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
logger.info("Successfully transferred {} to {} on remote host {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, fullPathRef.get(), hostname, millis, dataRate});
String fullPathWithSlash = fullPathRef.get();
if (!fullPathWithSlash.startsWith("/")) {
fullPathWithSlash = "/" + fullPathWithSlash;
}
final String destinationUri = transfer.getProtocolName() + "://" + hostname + fullPathWithSlash;
session.getProvenanceReporter().send(flowFile, destinationUri, millis);
}
if (conflictResult.isPenalize()) {
flowFile = session.penalize(flowFile);
}
session.transfer(flowFile, conflictResult.getRelationship());
session.commitAsync();
} while (isScheduled()
&& (getRelationships().size() == context.getAvailableRelationships().size())
&& (++fileCount < maxNumberOfFiles)
&& ((flowFile = session.get()) != null));
} catch (final IOException e) {
context.yield();
logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
} catch (final FlowFileAccessException e) {
context.yield();
logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e.getCause()});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
} catch (final ProcessException e) {
context.yield();
logger.error("Unable to transfer {} to remote host {} due to {}: {}; routing to failure", new Object[]{flowFile, hostname, e, e.getCause()});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
//Attempts to identify naming or content issues with files before they are transferred.
private ConflictResult identifyAndResolveConflictFile(
final String conflictResolutionType,
final T transfer,
final String path,
final FlowFile flowFile,
final boolean rejectZeroByteFiles,
final ComponentLog logger)
throws IOException {
Relationship destinationRelationship = REL_SUCCESS;
String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
boolean transferFile = true;
boolean penalizeFile = false;
//First, check if the file is empty
//Reject files that are zero bytes or less
if (rejectZeroByteFiles) {
final long sizeInBytes = flowFile.getSize();
if (sizeInBytes == 0) {
logger.warn("Rejecting {} because it is zero bytes", new Object[]{flowFile});
return new ConflictResult(REL_REJECT, false, fileName, true);
}
}
//Second, check if the user doesn't care about detecting naming conflicts ahead of time
if (conflictResolutionType.equalsIgnoreCase(FileTransfer.CONFLICT_RESOLUTION_NONE)) {
return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile);
}
final FileInfo remoteFileInfo = transfer.getRemoteFileInfo(flowFile, path, fileName);
if (remoteFileInfo == null) {
return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile);
}
if (remoteFileInfo.isDirectory()) {
logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
return new ConflictResult(REL_REJECT, false, fileName, false);
}
logger.info("Discovered a filename conflict on the remote server for {} so handling using configured Conflict Resolution of {}",
new Object[]{flowFile, conflictResolutionType});
switch (conflictResolutionType.toUpperCase()) {
case FileTransfer.CONFLICT_RESOLUTION_REJECT:
destinationRelationship = REL_REJECT;
transferFile = false;
penalizeFile = false;
logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
break;
case FileTransfer.CONFLICT_RESOLUTION_REPLACE:
transfer.deleteFile(flowFile, path, fileName);
destinationRelationship = REL_SUCCESS;
transferFile = true;
penalizeFile = false;
logger.info("Resolving filename conflict for {} with remote server by deleting remote file and replacing with flow file", new Object[]{flowFile});
break;
case FileTransfer.CONFLICT_RESOLUTION_RENAME:
boolean uniqueNameGenerated = false;
for (int i = 1; i < 100 && !uniqueNameGenerated; i++) {
String possibleFileName = i + "." + fileName;
final FileInfo renamedFileInfo = transfer.getRemoteFileInfo(flowFile, path, possibleFileName);
uniqueNameGenerated = (renamedFileInfo == null);
if (uniqueNameGenerated) {
fileName = possibleFileName;
logger.info("Attempting to resolve filename conflict for {} on the remote server by using a newly generated filename of: {}", new Object[]{flowFile, fileName});
destinationRelationship = REL_SUCCESS;
transferFile = true;
penalizeFile = false;
break;
}
}
if (!uniqueNameGenerated) {
destinationRelationship = REL_REJECT;
transferFile = false;
penalizeFile = false;
logger.warn("Could not determine a unique name after 99 attempts for. Switching resolution mode to REJECT for " + flowFile);
}
break;
case FileTransfer.CONFLICT_RESOLUTION_IGNORE:
destinationRelationship = REL_SUCCESS;
transferFile = false;
penalizeFile = false;
logger.info("Resolving conflict for {} by not transferring file and and still considering the process a success.", new Object[]{flowFile});
break;
case FileTransfer.CONFLICT_RESOLUTION_FAIL:
destinationRelationship = REL_FAILURE;
transferFile = false;
penalizeFile = true;
logger.warn("Resolved filename conflict for {} as configured by routing to FAILURE relationship.", new Object[]{flowFile});
default:
break;
}
return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile);
}
/**
* static inner class to hold conflict data
*/
private static class ConflictResult {
final Relationship relationship;
final boolean transferFile;
final String newFileName;
final boolean penalizeFile;
public ConflictResult(final Relationship relationship, final boolean transferFileVal, final String newFileNameVal, final boolean penalizeFileVal) {
this.relationship = relationship;
this.transferFile = transferFileVal;
this.newFileName = newFileNameVal;
this.penalizeFile = penalizeFileVal;
}
public boolean isTransfer() {
return transferFile;
}
public boolean isPenalize() {
return penalizeFile;
}
public String getFileName() {
return newFileName;
}
public Relationship getRelationship() {
return relationship;
}
}
}