| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.processors.standard; |
| |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.processor.AbstractProcessor; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.exception.FlowFileAccessException; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processor.io.InputStreamCallback; |
| import org.apache.nifi.processors.standard.util.FileInfo; |
| import org.apache.nifi.processors.standard.util.FileTransfer; |
| import org.apache.nifi.processors.standard.util.SFTPTransfer; |
| import org.apache.nifi.util.StopWatch; |
| import org.apache.nifi.util.StringUtils; |
| |
| import java.io.BufferedInputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| /** |
| * Base class for PutFTP & PutSFTP |
| * |
| * @param <T> type of transfer |
| */ |
| public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractProcessor { |
| |
| public static final Relationship REL_SUCCESS = new Relationship.Builder() |
| .name("success") |
| .description("FlowFiles that are successfully sent will be routed to success") |
| .build(); |
| public static final Relationship REL_FAILURE = new Relationship.Builder() |
| .name("failure") |
| .description("FlowFiles that failed to send to the remote system; failure is usually looped back to this processor") |
| .build(); |
| public static final Relationship REL_REJECT = new Relationship.Builder() |
| .name("reject") |
| .description("FlowFiles that were rejected by the destination system") |
| .build(); |
| |
| private final Set<Relationship> relationships; |
| |
| public PutFileTransfer() { |
| super(); |
| final Set<Relationship> relationships = new HashSet<>(); |
| relationships.add(REL_SUCCESS); |
| relationships.add(REL_FAILURE); |
| relationships.add(REL_REJECT); |
| this.relationships = Collections.unmodifiableSet(relationships); |
| } |
| |
| @Override |
| public Set<Relationship> getRelationships() { |
| return relationships; |
| } |
| |
| protected abstract T getFileTransfer(final ProcessContext context); |
| |
| protected void beforePut(final FlowFile flowFile, final ProcessContext context, final T transfer) throws IOException { |
| |
| } |
| |
| protected void afterPut(final FlowFile flowFile, final ProcessContext context, final T transfer) throws IOException { |
| |
| } |
| |
| @Override |
| public void onTrigger(final ProcessContext context, final ProcessSession session) { |
| FlowFile flowFile = session.get(); |
| if (flowFile == null) { |
| return; |
| } |
| |
| final ComponentLog logger = getLogger(); |
| String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); |
| |
| final int maxNumberOfFiles = context.getProperty(FileTransfer.BATCH_SIZE).asInteger(); |
| int fileCount = 0; |
| try (final T transfer = getFileTransfer(context)) { |
| do { |
| //check if hostname is regular expression requiring evaluation |
| if(context.getProperty(FileTransfer.HOSTNAME).isExpressionLanguagePresent()) { |
| hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); |
| } |
| final String rootPath = context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue(); |
| final String workingDirPath; |
| if (StringUtils.isBlank(rootPath)) { |
| workingDirPath = transfer.getHomeDirectory(flowFile); |
| } else { |
| workingDirPath = transfer.getAbsolutePath(flowFile, rootPath); |
| } |
| |
| final boolean rejectZeroByteFiles = context.getProperty(FileTransfer.REJECT_ZERO_BYTE).asBoolean(); |
| final ConflictResult conflictResult |
| = identifyAndResolveConflictFile(context.getProperty(FileTransfer.CONFLICT_RESOLUTION).getValue(), transfer, workingDirPath, flowFile, rejectZeroByteFiles, logger); |
| |
| if (conflictResult.isTransfer()) { |
| final StopWatch stopWatch = new StopWatch(); |
| stopWatch.start(); |
| |
| beforePut(flowFile, context, transfer); |
| final FlowFile flowFileToTransfer = flowFile; |
| final AtomicReference<String> fullPathRef = new AtomicReference<>(null); |
| session.read(flowFile, new InputStreamCallback() { |
| @Override |
| public void process(final InputStream in) throws IOException { |
| try (final InputStream bufferedIn = new BufferedInputStream(in)) { |
| if (workingDirPath != null && context.getProperty(SFTPTransfer.CREATE_DIRECTORY).asBoolean()) { |
| transfer.ensureDirectoryExists(flowFileToTransfer, new File(workingDirPath)); |
| } |
| |
| fullPathRef.set(transfer.put(flowFileToTransfer, workingDirPath, conflictResult.getFileName(), bufferedIn)); |
| } |
| } |
| }); |
| afterPut(flowFile, context, transfer); |
| |
| stopWatch.stop(); |
| final String dataRate = stopWatch.calculateDataRate(flowFile.getSize()); |
| final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); |
| logger.info("Successfully transferred {} to {} on remote host {} in {} milliseconds at a rate of {}", |
| new Object[]{flowFile, fullPathRef.get(), hostname, millis, dataRate}); |
| |
| String fullPathWithSlash = fullPathRef.get(); |
| if (!fullPathWithSlash.startsWith("/")) { |
| fullPathWithSlash = "/" + fullPathWithSlash; |
| } |
| final String destinationUri = transfer.getProtocolName() + "://" + hostname + fullPathWithSlash; |
| session.getProvenanceReporter().send(flowFile, destinationUri, millis); |
| } |
| |
| if (conflictResult.isPenalize()) { |
| flowFile = session.penalize(flowFile); |
| } |
| |
| session.transfer(flowFile, conflictResult.getRelationship()); |
| session.commitAsync(); |
| } while (isScheduled() |
| && (getRelationships().size() == context.getAvailableRelationships().size()) |
| && (++fileCount < maxNumberOfFiles) |
| && ((flowFile = session.get()) != null)); |
| } catch (final IOException e) { |
| context.yield(); |
| logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e}); |
| flowFile = session.penalize(flowFile); |
| session.transfer(flowFile, REL_FAILURE); |
| } catch (final FlowFileAccessException e) { |
| context.yield(); |
| logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e.getCause()}); |
| flowFile = session.penalize(flowFile); |
| session.transfer(flowFile, REL_FAILURE); |
| } catch (final ProcessException e) { |
| context.yield(); |
| logger.error("Unable to transfer {} to remote host {} due to {}: {}; routing to failure", new Object[]{flowFile, hostname, e, e.getCause()}); |
| flowFile = session.penalize(flowFile); |
| session.transfer(flowFile, REL_FAILURE); |
| } |
| } |
| |
| //Attempts to identify naming or content issues with files before they are transferred. |
| private ConflictResult identifyAndResolveConflictFile( |
| final String conflictResolutionType, |
| final T transfer, |
| final String path, |
| final FlowFile flowFile, |
| final boolean rejectZeroByteFiles, |
| final ComponentLog logger) |
| throws IOException { |
| Relationship destinationRelationship = REL_SUCCESS; |
| String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()); |
| boolean transferFile = true; |
| boolean penalizeFile = false; |
| |
| //First, check if the file is empty |
| //Reject files that are zero bytes or less |
| if (rejectZeroByteFiles) { |
| final long sizeInBytes = flowFile.getSize(); |
| if (sizeInBytes == 0) { |
| logger.warn("Rejecting {} because it is zero bytes", new Object[]{flowFile}); |
| return new ConflictResult(REL_REJECT, false, fileName, true); |
| } |
| } |
| |
| //Second, check if the user doesn't care about detecting naming conflicts ahead of time |
| if (conflictResolutionType.equalsIgnoreCase(FileTransfer.CONFLICT_RESOLUTION_NONE)) { |
| return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile); |
| } |
| |
| final FileInfo remoteFileInfo = transfer.getRemoteFileInfo(flowFile, path, fileName); |
| if (remoteFileInfo == null) { |
| return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile); |
| } |
| |
| if (remoteFileInfo.isDirectory()) { |
| logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile}); |
| return new ConflictResult(REL_REJECT, false, fileName, false); |
| } |
| |
| logger.info("Discovered a filename conflict on the remote server for {} so handling using configured Conflict Resolution of {}", |
| new Object[]{flowFile, conflictResolutionType}); |
| |
| switch (conflictResolutionType.toUpperCase()) { |
| case FileTransfer.CONFLICT_RESOLUTION_REJECT: |
| destinationRelationship = REL_REJECT; |
| transferFile = false; |
| penalizeFile = false; |
| logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile}); |
| break; |
| case FileTransfer.CONFLICT_RESOLUTION_REPLACE: |
| transfer.deleteFile(flowFile, path, fileName); |
| destinationRelationship = REL_SUCCESS; |
| transferFile = true; |
| penalizeFile = false; |
| logger.info("Resolving filename conflict for {} with remote server by deleting remote file and replacing with flow file", new Object[]{flowFile}); |
| break; |
| case FileTransfer.CONFLICT_RESOLUTION_RENAME: |
| boolean uniqueNameGenerated = false; |
| for (int i = 1; i < 100 && !uniqueNameGenerated; i++) { |
| String possibleFileName = i + "." + fileName; |
| |
| final FileInfo renamedFileInfo = transfer.getRemoteFileInfo(flowFile, path, possibleFileName); |
| uniqueNameGenerated = (renamedFileInfo == null); |
| if (uniqueNameGenerated) { |
| fileName = possibleFileName; |
| logger.info("Attempting to resolve filename conflict for {} on the remote server by using a newly generated filename of: {}", new Object[]{flowFile, fileName}); |
| destinationRelationship = REL_SUCCESS; |
| transferFile = true; |
| penalizeFile = false; |
| break; |
| } |
| } |
| if (!uniqueNameGenerated) { |
| destinationRelationship = REL_REJECT; |
| transferFile = false; |
| penalizeFile = false; |
| logger.warn("Could not determine a unique name after 99 attempts for. Switching resolution mode to REJECT for " + flowFile); |
| } |
| break; |
| case FileTransfer.CONFLICT_RESOLUTION_IGNORE: |
| destinationRelationship = REL_SUCCESS; |
| transferFile = false; |
| penalizeFile = false; |
| logger.info("Resolving conflict for {} by not transferring file and and still considering the process a success.", new Object[]{flowFile}); |
| break; |
| case FileTransfer.CONFLICT_RESOLUTION_FAIL: |
| destinationRelationship = REL_FAILURE; |
| transferFile = false; |
| penalizeFile = true; |
| logger.warn("Resolved filename conflict for {} as configured by routing to FAILURE relationship.", new Object[]{flowFile}); |
| default: |
| break; |
| } |
| |
| return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile); |
| } |
| |
| /** |
| * static inner class to hold conflict data |
| */ |
| private static class ConflictResult { |
| |
| final Relationship relationship; |
| final boolean transferFile; |
| final String newFileName; |
| final boolean penalizeFile; |
| |
| public ConflictResult(final Relationship relationship, final boolean transferFileVal, final String newFileNameVal, final boolean penalizeFileVal) { |
| this.relationship = relationship; |
| this.transferFile = transferFileVal; |
| this.newFileName = newFileNameVal; |
| this.penalizeFile = penalizeFileVal; |
| } |
| |
| public boolean isTransfer() { |
| return transferFile; |
| } |
| |
| public boolean isPenalize() { |
| return penalizeFile; |
| } |
| |
| public String getFileName() { |
| return newFileName; |
| } |
| |
| public Relationship getRelationship() { |
| return relationship; |
| } |
| } |
| } |