blob: 299be54c5d146481e9c9b5faa2ea53a56634067c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.file;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.BatchConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for remote file consumers.
*/
public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware {
protected final transient Logger log = LoggerFactory.getLogger(getClass());
protected GenericFileEndpoint<T> endpoint;
protected GenericFileOperations<T> operations;
protected boolean loggedIn;
protected String fileExpressionResult;
protected int maxMessagesPerPoll;
protected volatile ShutdownRunningTask shutdownRunningTask;
protected volatile int pendingExchanges;
public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
super(endpoint, processor);
this.endpoint = endpoint;
this.operations = operations;
}
/**
* Poll for files
*/
protected int poll() throws Exception {
// must reset for each poll
fileExpressionResult = null;
shutdownRunningTask = null;
pendingExchanges = 0;
// before we poll is there anything we need to check?
// such as are we connected to the FTP Server still?
if (!prePollCheck()) {
if (log.isDebugEnabled()) {
log.debug("Skipping poll as pre poll check returned false");
}
return 0;
}
// gather list of files to process
List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
String name = endpoint.getConfiguration().getDirectory();
// time how long time it takes to poll
StopWatch stop = new StopWatch();
boolean limitHit = !pollDirectory(name, files);
long delta = stop.stop();
if (log.isDebugEnabled()) {
log.debug("Took " + TimeUtils.printDuration(delta) + " to poll: " + name);
}
// log if we hit the limit
if (limitHit) {
if (log.isDebugEnabled()) {
log.debug("Limiting maximum messages to poll at " + maxMessagesPerPoll + " files as there was more messages in this poll.");
}
}
// sort files using file comparator if provided
if (endpoint.getSorter() != null) {
Collections.sort(files, endpoint.getSorter());
}
// sort using build in sorters so we can use expressions
LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
for (GenericFile<T> file : files) {
Exchange exchange = endpoint.createExchange(file);
endpoint.configureExchange(exchange);
endpoint.configureMessage(file, exchange.getIn());
exchanges.add(exchange);
}
// sort files using exchange comparator if provided
if (endpoint.getSortBy() != null) {
Collections.sort(exchanges, endpoint.getSortBy());
}
// consume files one by one
int total = exchanges.size();
if (total > 0 && log.isDebugEnabled()) {
log.debug("Total " + total + " files to consume");
}
Queue<Exchange> q = exchanges;
int polledMessages = processBatch(CastUtils.cast(q));
postPollCheck();
return polledMessages;
}
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
@SuppressWarnings("unchecked")
public int processBatch(Queue<Object> exchanges) {
int total = exchanges.size();
// limit if needed
if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
if (log.isDebugEnabled()) {
log.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as there was " + total + " messages in this poll.");
}
total = maxMessagesPerPoll;
}
for (int index = 0; index < total && isBatchAllowed(); index++) {
// only loop if we are started (allowed to run)
// use poll to remove the head so it does not consume memory even after we have processed it
Exchange exchange = (Exchange) exchanges.poll();
// add current index and total as properties
exchange.setProperty(Exchange.BATCH_INDEX, index);
exchange.setProperty(Exchange.BATCH_SIZE, total);
exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
// update pending number of exchanges
pendingExchanges = total - index - 1;
// process the current exchange
processExchange(exchange);
}
// remove the file from the in progress list in case the batch was limited by max messages per poll
while (exchanges.size() > 0) {
Exchange exchange = (Exchange) exchanges.poll();
GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
String key = file.getAbsoluteFilePath();
endpoint.getInProgressRepository().remove(key);
}
return total;
}
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
// store a reference what to do in case when shutting down and we have pending messages
this.shutdownRunningTask = shutdownRunningTask;
// do not defer shutdown
return false;
}
public int getPendingExchangesSize() {
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
return pendingExchanges;
} else {
return 0;
}
}
public void prepareShutdown() {
// noop
}
public boolean isBatchAllowed() {
// stop if we are not running
boolean answer = isRunAllowed();
if (!answer) {
return false;
}
if (shutdownRunningTask == null) {
// we are not shutting down so continue to run
return true;
}
// we are shutting down so only continue if we are configured to complete all tasks
return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
}
/**
* Whether or not we can continue polling for more files
*
* @param fileList the current list of gathered files
* @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit
*/
public boolean canPollMoreFiles(List fileList) {
if (maxMessagesPerPoll <= 0) {
// no limitation
return true;
}
// then only poll if we haven't reached the max limit
return fileList.size() < maxMessagesPerPoll;
}
/**
* Override if required. Perform some checks (and perhaps actions) before we poll.
*
* @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll.
*/
protected boolean prePollCheck() throws Exception {
return true;
}
/**
* Override if required. Perform some checks (and perhaps actions) after we have polled.
*/
protected void postPollCheck() {
// noop
}
/**
* Polls the given directory for files to process
*
* @param fileName current directory or file
* @param fileList current list of files gathered
* @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit
*/
protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList);
/**
* Sets the operations to be used.
* <p/>
* Can be used to set a fresh operations in case of recovery attempts
*
* @param operations the operations
*/
public void setOperations(GenericFileOperations<T> operations) {
this.operations = operations;
}
/**
* Processes the exchange
*
* @param exchange the exchange
*/
protected void processExchange(final Exchange exchange) {
GenericFile<T> file = getExchangeFileProperty(exchange);
if (log.isTraceEnabled()) {
log.trace("Processing file: " + file);
}
// must extract the absolute name before the begin strategy as the file could potentially be pre moved
// and then the file name would be changed
String absoluteFileName = file.getAbsoluteFilePath();
// check if we can begin processing the file
try {
final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
boolean begin = processStrategy.begin(operations, endpoint, exchange, file);
if (!begin) {
if (log.isDebugEnabled()) {
log.debug(endpoint + " cannot begin processing file: " + file);
}
// begin returned false, so remove file from the in progress list as its no longer in progress
endpoint.getInProgressRepository().remove(absoluteFileName);
return;
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug(endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage(), e);
}
endpoint.getInProgressRepository().remove(absoluteFileName);
return;
}
// must use file from exchange as it can be updated due the
// preMoveNamePrefix/preMoveNamePostfix options
final GenericFile<T> target = getExchangeFileProperty(exchange);
// must use full name when downloading so we have the correct path
final String name = target.getAbsoluteFilePath();
try {
// retrieve the file using the stream
if (log.isTraceEnabled()) {
log.trace("Retrieving file: " + name + " from: " + endpoint);
}
// retrieve the file and check it was a success
boolean retrieved = operations.retrieveFile(name, exchange);
if (!retrieved) {
// throw exception to handle the problem with retrieving the file
// then if the method return false or throws an exception is handled the same in here
// as in both cases an exception is being thrown
throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint);
}
if (log.isTraceEnabled()) {
log.trace("Retrieved file: " + name + " from: " + endpoint);
}
// register on completion callback that does the completion strategies
// (for instance to move the file after we have processed it)
exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName));
if (log.isDebugEnabled()) {
log.debug("About to process file: " + target + " using exchange: " + exchange);
}
// process the exchange using the async consumer to support async routing engine
// which can be supported by this file consumer as all the done work is
// provided in the GenericFileOnCompletion
getAsyncProcessor().process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
// noop
if (log.isTraceEnabled()) {
log.trace("Done processing file: " + target + (doneSync ? " synchronously" : " asynchronously"));
}
}
});
} catch (Exception e) {
// remove file from the in progress list due to failure
// (cannot be in finally block due to GenericFileOnCompletion will remove it
// from in progress when it takes over and processes the file, which may happen
// by another thread at a later time. So its only safe to remove it if there was an exception)
endpoint.getInProgressRepository().remove(absoluteFileName);
handleException(e);
}
}
/**
* Strategy for validating if the given remote file should be included or not
*
* @param file the file
* @param isDirectory whether the file is a directory or a file
* @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
*/
protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) {
if (!isMatched(file, isDirectory)) {
if (log.isTraceEnabled()) {
log.trace("File did not match. Will skip this file: " + file);
}
return false;
} else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) {
if (log.isTraceEnabled()) {
log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: " + file);
}
return false;
}
// file matched
return true;
}
/**
* Strategy to perform file matching based on endpoint configuration.
* <p/>
* Will always return <tt>false</tt> for certain files/folders:
* <ul>
* <li>Starting with a dot</li>
* <li>lock files</li>
* </ul>
* And then <tt>true</tt> for directories.
*
* @param file the file
* @param isDirectory whether the file is a directory or a file
* @return <tt>true</tt> if the remote file is matched, <tt>false</tt> if not
*/
protected boolean isMatched(GenericFile<T> file, boolean isDirectory) {
String name = file.getFileNameOnly();
// folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
if (name.startsWith(".")) {
return false;
}
// lock files should be skipped
if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
return false;
}
// directories so far is always regarded as matched (matching on the name is only for files)
if (isDirectory) {
return true;
}
if (endpoint.getFilter() != null) {
if (!endpoint.getFilter().accept(file)) {
return false;
}
}
if (ObjectHelper.isNotEmpty(endpoint.getExclude())) {
if (name.matches(endpoint.getExclude())) {
return false;
}
}
if (ObjectHelper.isNotEmpty(endpoint.getInclude())) {
if (!name.matches(endpoint.getInclude())) {
return false;
}
}
// use file expression for a simple dynamic file filter
if (endpoint.getFileName() != null) {
evaluateFileExpression();
if (fileExpressionResult != null) {
if (!name.equals(fileExpressionResult)) {
return false;
}
}
}
// if done file name is enabled, then the file is only valid if a done file exists
if (endpoint.getDoneFileName() != null) {
// done file must be in same path as the file
String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath());
ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
// is it a done file name?
if (endpoint.isDoneFile(file.getFileNameOnly())) {
if (log.isTraceEnabled()) {
log.trace("Skipping done file: " + file);
}
return false;
}
// the file is only valid if the done file exist
if (!operations.existsFile(doneFileName)) {
if (log.isTraceEnabled()) {
log.trace("Done file: " + doneFileName + " does not exist");
}
return false;
}
}
return true;
}
/**
* Is the given file already in progress.
*
* @param file the file
* @return <tt>true</tt> if the file is already in progress
*/
protected boolean isInProgress(GenericFile<T> file) {
String key = file.getAbsoluteFilePath();
return !endpoint.getInProgressRepository().add(key);
}
private void evaluateFileExpression() {
if (fileExpressionResult == null) {
// create a dummy exchange as Exchange is needed for expression evaluation
Exchange dummy = new DefaultExchange(endpoint.getCamelContext());
fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
}
}
@SuppressWarnings("unchecked")
private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
}
@Override
protected void doStart() throws Exception {
super.doStart();
// prepare on startup
endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint);
}
}