blob: 5ed1f7c8424a98f8e6ce1d76816c684352e7014e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.file;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.camel.processor.DeadLetterChannel;
import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Base class for remote file consumers.
*/
public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer {
protected final transient Log log = LogFactory.getLog(getClass());
protected GenericFileEndpoint<T> endpoint;
protected GenericFileOperations<T> operations;
protected boolean loggedIn;
protected String fileExpressionResult;
public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
super(endpoint, processor);
this.endpoint = endpoint;
this.operations = operations;
}
/**
* Poll for files
*/
protected void poll() throws Exception {
// must reset for each poll
fileExpressionResult = null;
// before we poll is there anything we need to check ? Such as are we
// connected to the FTP Server Still ?
if (!prePollCheck()) {
log.debug("Skipping pool as pre poll check returned false");
}
// gather list of files to process
List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
String name = endpoint.getConfiguration().getDirectory();
pollDirectory(name, files);
// sort files using file comparator if provided
if (endpoint.getSorter() != null) {
Collections.sort(files, endpoint.getSorter());
}
// sort using build in sorters that is expression based
// first we need to convert to RemoteFileExchange objects so we can sort
// using expressions
List<GenericFileExchange<T>> exchanges = new ArrayList<GenericFileExchange<T>>(files.size());
for (GenericFile<T> file : files) {
GenericFileExchange<T> exchange = endpoint.createExchange(file);
endpoint.configureMessage(file, exchange.getIn());
exchanges.add(exchange);
}
// sort files using exchange comparator if provided
if (endpoint.getSortBy() != null) {
Collections.sort(exchanges, endpoint.getSortBy());
}
// consume files one by one
int total = exchanges.size();
if (total > 0 && log.isDebugEnabled()) {
log.debug("Total " + total + " files to consume");
}
for (int index = 0; index < total && isRunAllowed(); index++) {
// only loop if we are started (allowed to run)
GenericFileExchange<T> exchange = exchanges.get(index);
// add current index and total as headers
exchange.getIn().setHeader(Exchange.FILE_BATCH_INDEX, index);
exchange.getIn().setHeader(Exchange.FILE_BATCH_SIZE, total);
processExchange(exchange);
}
}
/**
* Override if required. Perform some checks (and perhaps actions) before we
* poll.
*
* @return true to poll, false to skip this poll.
*/
protected boolean prePollCheck() throws Exception {
return true;
}
/**
* Polls the given directory for files to process
*
* @param fileName current directory or file
* @param fileList current list of files gathered
*/
protected abstract void pollDirectory(String fileName, List<GenericFile<T>> fileList);
/**
* Processes the exchange
*
* @param exchange the exchange
*/
protected void processExchange(final GenericFileExchange<T> exchange) {
if (log.isTraceEnabled()) {
log.trace("Processing remote file: " + exchange.getGenericFile());
}
try {
final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
if (processStrategy.begin(operations, endpoint, exchange, exchange.getGenericFile())) {
// must use file from exchange as it can be updated due the
// preMoveNamePrefix/preMoveNamePostfix options
final GenericFile<T> target = exchange.getGenericFile();
// must use full name when downloading so we have the correct path
final String name = target.getAbsoluteFilePath();
// retrieve the file using the stream
if (log.isTraceEnabled()) {
log.trace("Retreiving file: " + name + " from: " + endpoint);
}
operations.retrieveFile(name, exchange);
if (log.isTraceEnabled()) {
log.trace("Retrieved file: " + name + " from: " + endpoint);
}
if (log.isDebugEnabled()) {
log.debug("About to process file: " + target + " using exchange: " + exchange);
}
// Use the async processor interface so that processing of
// the exchange can happen asynchronously
getAsyncProcessor().process(exchange, new AsyncCallback() {
public void done(boolean sync) {
final GenericFile<T> file = exchange.getGenericFile();
boolean failed = exchange.isFailed();
if (log.isDebugEnabled()) {
log.debug("Done processing file: " + file + " using exchange: " + exchange);
}
boolean committed = false;
try {
if (!failed) {
// commit the file strategy if there was no failure or already handled by the DeadLetterChannel
processStrategyCommit(processStrategy, exchange, file);
committed = true;
} else {
// there was an exception but it was not handled by the DeadLetterChannel
handleException(exchange.getException());
}
} finally {
if (!committed) {
processStrategyRollback(processStrategy, exchange, file);
}
}
}
});
} else {
log.warn(endpoint + " cannot process remote file: " + exchange.getGenericFile());
}
} catch (Exception e) {
handleException(e);
}
}
/**
* Strategy when the file was processed and a commit should be executed.
*
* @param processStrategy the strategy to perform the commit
* @param exchange the exchange
* @param file the file processed
*/
@SuppressWarnings("unchecked")
protected void processStrategyCommit(GenericFileProcessStrategy<T> processStrategy,
GenericFileExchange<T> exchange, GenericFile<T> file) {
if (endpoint.isIdempotent()) {
// only add to idempotent repository if we could process the file
// only use the filename as the key as the file could be moved into a done folder
endpoint.getIdempotentRepository().add(file.getFileName());
}
try {
if (log.isTraceEnabled()) {
log.trace("Committing remote file strategy: " + processStrategy + " for file: " + file);
}
processStrategy.commit(operations, endpoint, exchange, file);
} catch (Exception e) {
handleException(e);
}
}
/**
* Strategy when the file was not processed and a rollback should be
* executed.
*
* @param processStrategy the strategy to perform the commit
* @param exchange the exchange
* @param file the file processed
*/
protected void processStrategyRollback(GenericFileProcessStrategy<T> processStrategy,
GenericFileExchange<T> exchange, GenericFile<T> file) {
if (log.isWarnEnabled()) {
log.warn("Rolling back remote file strategy: " + processStrategy + " for file: " + file);
}
try {
processStrategy.rollback(operations, endpoint, exchange, file);
} catch (Exception e) {
handleException(e);
}
}
/**
* Strategy for validating if the given remote file should be included or
* not
*
* @param file the remote file
* @param isDirectory wether the file is a directory or a file
* @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
*/
@SuppressWarnings("unchecked")
protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) {
if (!isMatched(file, isDirectory)) {
if (log.isTraceEnabled()) {
log.trace("Remote file did not match. Will skip this remote file: " + file);
}
return false;
} else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getFileName())) {
// only use the filename as the key as the file could be moved into a done folder
if (log.isTraceEnabled()) {
log.trace("RemoteFileConsumer is idempotent and the file has been consumed before. Will skip this remote file: " + file);
}
return false;
}
// file matched
return true;
}
/**
* Strategy to perform file matching based on endpoint configuration.
* <p/>
* Will always return <tt>false</tt> for certain files/folders:
* <ul>
* <li>Starting with a dot</li>
* <li>lock files</li>
* </ul>
* And then <tt>true</tt> for directories.
*
* @param file the remote file
* @param isDirectory wether the file is a directory or a file
* @return <tt>true</tt> if the remote file is matched, <tt>false</tt> if not
*/
protected boolean isMatched(GenericFile<T> file, boolean isDirectory) {
String name = file.getFileNameOnly();
// folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
if (name.startsWith(".")) {
return false;
}
// lock files should be skipped
if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
return false;
}
// directories so far is always regarded as matched (matching on the name is only for files)
if (isDirectory) {
return true;
}
if (endpoint.getFilter() != null) {
if (!endpoint.getFilter().accept(file)) {
return false;
}
}
if (ObjectHelper.isNotEmpty(endpoint.getExclude())) {
if (name.matches(endpoint.getExclude())) {
return false;
}
}
if (ObjectHelper.isNotEmpty(endpoint.getInclude())) {
if (!name.matches(endpoint.getInclude())) {
return false;
}
}
// use file expression for a simple dynamic file filter
if (endpoint.getFileName() != null) {
evaluteFileExpression();
if (fileExpressionResult != null) {
if (!name.equals(fileExpressionResult)) {
return false;
}
}
}
return true;
}
private void evaluteFileExpression() {
if (fileExpressionResult == null) {
// create a dummy exchange as Exchange is needed for expression evaluation
Exchange dummy = new DefaultExchange(endpoint.getCamelContext());
fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
}
}
}