blob: 1ac3619bbc1cc6beda0056cda4c49fe63fe1570d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.file;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.ExpressionIllegalSyntaxException;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.converter.IOConverter;
import org.apache.camel.impl.ScheduledPollEndpoint;
import org.apache.camel.language.simple.SimpleLanguage;
import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
import org.apache.camel.spi.FactoryFinder;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.spi.Language;
import org.apache.camel.util.FileUtil;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Generic FileEndpoint
*/
public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint {
protected static final transient String DEFAULT_STRATEGYFACTORY_CLASS = "org.apache.camel.component.file.strategy.GenericFileProcessStrategyFactory";
protected static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000;
protected final transient Logger log = LoggerFactory.getLogger(getClass());
protected GenericFileProcessStrategy<T> processStrategy;
protected GenericFileConfiguration configuration;
protected IdempotentRepository<String> inProgressRepository = new MemoryIdempotentRepository();
protected String localWorkDirectory;
protected boolean autoCreate = true;
protected boolean startingDirectoryMustExist;
protected boolean directoryMustExist;
protected int bufferSize = 128 * 1024;
protected GenericFileExist fileExist = GenericFileExist.Override;
protected boolean noop;
protected boolean recursive;
protected boolean delete;
protected boolean flatten;
protected int maxMessagesPerPoll;
protected String tempPrefix;
protected Expression tempFileName;
protected boolean eagerDeleteTargetFile = true;
protected String include;
protected String exclude;
protected String charset;
protected Expression fileName;
protected Expression move;
protected Expression moveFailed;
protected Expression preMove;
protected Boolean idempotent;
protected IdempotentRepository<String> idempotentRepository;
protected GenericFileFilter<T> filter;
protected Comparator<GenericFile<T>> sorter;
protected Comparator<Exchange> sortBy;
protected String readLock = "none";
protected long readLockCheckInterval = 1000;
protected long readLockTimeout = 10000;
protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
protected boolean keepLastModified;
protected String doneFileName;
public GenericFileEndpoint() {
}
public GenericFileEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
}
public boolean isSingleton() {
return true;
}
public abstract GenericFileConsumer<T> createConsumer(Processor processor) throws Exception;
public abstract GenericFileProducer<T> createProducer() throws Exception;
public abstract Exchange createExchange(GenericFile<T> file);
public abstract String getScheme();
public abstract char getFileSeparator();
public abstract boolean isAbsolute(String name);
/**
* Return the file name that will be auto-generated for the given message if
* none is provided
*/
public String getGeneratedFileName(Message message) {
return StringHelper.sanitize(message.getMessageId());
}
public GenericFileProcessStrategy<T> getGenericFileProcessStrategy() {
if (processStrategy == null) {
processStrategy = createGenericFileStrategy();
if (log.isDebugEnabled()) {
log.debug("Using Generic file process strategy: " + processStrategy);
}
}
return processStrategy;
}
/**
* A strategy method to lazily create the file strategy
*/
@SuppressWarnings("unchecked")
protected GenericFileProcessStrategy<T> createGenericFileStrategy() {
Class<?> factory = null;
try {
FactoryFinder finder = getCamelContext().getFactoryFinder("META-INF/services/org/apache/camel/component/");
if (log.isTraceEnabled()) {
log.trace("Using FactoryFinder: " + finder);
}
factory = finder.findClass(getScheme(), "strategy.factory.");
} catch (ClassNotFoundException e) {
if (log.isTraceEnabled()) {
log.trace("'strategy.factory.class' not found", e);
}
} catch (IOException e) {
if (log.isTraceEnabled()) {
log.trace("No strategy factory defined in 'META-INF/services/org/apache/camel/component/'", e);
}
}
if (factory == null) {
// use default
try {
if (log.isTraceEnabled()) {
log.trace("Using ClassResolver to resolve class: " + DEFAULT_STRATEGYFACTORY_CLASS);
}
factory = this.getCamelContext().getClassResolver().resolveClass(DEFAULT_STRATEGYFACTORY_CLASS);
} catch (Exception e) {
if (log.isTraceEnabled()) {
log.trace("Cannot load class: " + DEFAULT_STRATEGYFACTORY_CLASS, e);
}
}
// fallback and us this class loader
try {
if (log.isTraceEnabled()) {
log.trace("Using classloader: " + this.getClass().getClassLoader() + " to resolve class: " + DEFAULT_STRATEGYFACTORY_CLASS);
}
factory = this.getCamelContext().getClassResolver().resolveClass(DEFAULT_STRATEGYFACTORY_CLASS, this.getClass().getClassLoader());
} catch (Exception e) {
if (log.isTraceEnabled()) {
log.trace("Cannot load class: " + DEFAULT_STRATEGYFACTORY_CLASS + " using classloader: " + this.getClass().getClassLoader(), e);
}
}
if (factory == null) {
throw new TypeNotPresentException(DEFAULT_STRATEGYFACTORY_CLASS + " class not found", null);
}
}
try {
Method factoryMethod = factory.getMethod("createGenericFileProcessStrategy", CamelContext.class, Map.class);
return (GenericFileProcessStrategy<T>) ObjectHelper.invokeMethod(factoryMethod, null, getCamelContext(), getParamsAsMap());
} catch (NoSuchMethodException e) {
throw new TypeNotPresentException(factory.getSimpleName() + ".createGenericFileProcessStrategy method not found", e);
}
}
public boolean isNoop() {
return noop;
}
public void setNoop(boolean noop) {
this.noop = noop;
}
public boolean isRecursive() {
return recursive;
}
public void setRecursive(boolean recursive) {
this.recursive = recursive;
}
public String getInclude() {
return include;
}
public void setInclude(String include) {
this.include = include;
}
public String getExclude() {
return exclude;
}
public void setExclude(String exclude) {
this.exclude = exclude;
}
public boolean isDelete() {
return delete;
}
public void setDelete(boolean delete) {
this.delete = delete;
}
public boolean isFlatten() {
return flatten;
}
public void setFlatten(boolean flatten) {
this.flatten = flatten;
}
public Expression getMove() {
return move;
}
public void setMove(Expression move) {
this.move = move;
}
/**
* Sets the move failure expression based on
* {@link org.apache.camel.language.simple.SimpleLanguage}
*/
public void setMoveFailed(String fileLanguageExpression) {
String expression = configureMoveOrPreMoveExpression(fileLanguageExpression);
this.moveFailed = createFileLanguageExpression(expression);
}
public Expression getMoveFailed() {
return moveFailed;
}
public void setMoveFailed(Expression moveFailed) {
this.moveFailed = moveFailed;
}
/**
* Sets the move expression based on
* {@link org.apache.camel.language.simple.SimpleLanguage}
*/
public void setMove(String fileLanguageExpression) {
String expression = configureMoveOrPreMoveExpression(fileLanguageExpression);
this.move = createFileLanguageExpression(expression);
}
public Expression getPreMove() {
return preMove;
}
public void setPreMove(Expression preMove) {
this.preMove = preMove;
}
/**
* Sets the pre move expression based on
* {@link org.apache.camel.language.simple.SimpleLanguage}
*/
public void setPreMove(String fileLanguageExpression) {
String expression = configureMoveOrPreMoveExpression(fileLanguageExpression);
this.preMove = createFileLanguageExpression(expression);
}
public Expression getFileName() {
return fileName;
}
public void setFileName(Expression fileName) {
this.fileName = fileName;
}
/**
* Sets the file expression based on
* {@link org.apache.camel.language.simple.SimpleLanguage}
*/
public void setFileName(String fileLanguageExpression) {
this.fileName = createFileLanguageExpression(fileLanguageExpression);
}
public String getDoneFileName() {
return doneFileName;
}
/**
* Sets the done file name.
* <p/>
* Only ${file.name} and ${file.name.noext} is supported as dynamic placeholders.
*/
public void setDoneFileName(String doneFileName) {
this.doneFileName = doneFileName;
}
public Boolean isIdempotent() {
return idempotent != null ? idempotent : false;
}
public String getCharset() {
return charset;
}
public void setCharset(String charset) {
IOConverter.validateCharset(charset);
this.charset = charset;
}
boolean isIdempotentSet() {
return idempotent != null;
}
public void setIdempotent(Boolean idempotent) {
this.idempotent = idempotent;
}
public IdempotentRepository<String> getIdempotentRepository() {
return idempotentRepository;
}
public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) {
this.idempotentRepository = idempotentRepository;
}
public GenericFileFilter<T> getFilter() {
return filter;
}
public void setFilter(GenericFileFilter<T> filter) {
this.filter = filter;
}
public Comparator<GenericFile<T>> getSorter() {
return sorter;
}
public void setSorter(Comparator<GenericFile<T>> sorter) {
this.sorter = sorter;
}
public Comparator<Exchange> getSortBy() {
return sortBy;
}
public void setSortBy(Comparator<Exchange> sortBy) {
this.sortBy = sortBy;
}
public void setSortBy(String expression) {
setSortBy(expression, false);
}
public void setSortBy(String expression, boolean reverse) {
setSortBy(GenericFileDefaultSorter.sortByFileLanguage(getCamelContext(), expression, reverse));
}
public String getTempPrefix() {
return tempPrefix;
}
/**
* Enables and uses temporary prefix when writing files, after write it will
* be renamed to the correct name.
*/
public void setTempPrefix(String tempPrefix) {
this.tempPrefix = tempPrefix;
// use only name as we set a prefix in from on the name
setTempFileName(tempPrefix + "${file:onlyname}");
}
public Expression getTempFileName() {
return tempFileName;
}
public void setTempFileName(Expression tempFileName) {
this.tempFileName = tempFileName;
}
public void setTempFileName(String tempFileNameExpression) {
this.tempFileName = createFileLanguageExpression(tempFileNameExpression);
}
public boolean isEagerDeleteTargetFile() {
return eagerDeleteTargetFile;
}
public void setEagerDeleteTargetFile(boolean eagerDeleteTargetFile) {
this.eagerDeleteTargetFile = eagerDeleteTargetFile;
}
public GenericFileConfiguration getConfiguration() {
if (configuration == null) {
configuration = new GenericFileConfiguration();
}
return configuration;
}
public void setConfiguration(GenericFileConfiguration configuration) {
this.configuration = configuration;
}
public GenericFileExclusiveReadLockStrategy<T> getExclusiveReadLockStrategy() {
return exclusiveReadLockStrategy;
}
public void setExclusiveReadLockStrategy(GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy) {
this.exclusiveReadLockStrategy = exclusiveReadLockStrategy;
}
public String getReadLock() {
return readLock;
}
public void setReadLock(String readLock) {
this.readLock = readLock;
}
public long getReadLockCheckInterval() {
return readLockCheckInterval;
}
public void setReadLockCheckInterval(long readLockCheckInterval) {
this.readLockCheckInterval = readLockCheckInterval;
}
public long getReadLockTimeout() {
return readLockTimeout;
}
public void setReadLockTimeout(long readLockTimeout) {
this.readLockTimeout = readLockTimeout;
}
public int getBufferSize() {
return bufferSize;
}
public void setBufferSize(int bufferSize) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("BufferSize must be a positive value, was " + bufferSize);
}
this.bufferSize = bufferSize;
}
public GenericFileExist getFileExist() {
return fileExist;
}
public void setFileExist(GenericFileExist fileExist) {
this.fileExist = fileExist;
}
public boolean isAutoCreate() {
return autoCreate;
}
public void setAutoCreate(boolean autoCreate) {
this.autoCreate = autoCreate;
}
public boolean isStartingDirectoryMustExist() {
return startingDirectoryMustExist;
}
public void setStartingDirectoryMustExist(boolean startingDirectoryMustExist) {
this.startingDirectoryMustExist = startingDirectoryMustExist;
}
public boolean isDirectoryMustExist() {
return directoryMustExist;
}
public void setDirectoryMustExist(boolean directoryMustExist) {
this.directoryMustExist = directoryMustExist;
}
public GenericFileProcessStrategy<T> getProcessStrategy() {
return processStrategy;
}
public void setProcessStrategy(GenericFileProcessStrategy<T> processStrategy) {
this.processStrategy = processStrategy;
}
public String getLocalWorkDirectory() {
return localWorkDirectory;
}
public void setLocalWorkDirectory(String localWorkDirectory) {
this.localWorkDirectory = localWorkDirectory;
}
public int getMaxMessagesPerPoll() {
return maxMessagesPerPoll;
}
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
public IdempotentRepository<String> getInProgressRepository() {
return inProgressRepository;
}
public void setInProgressRepository(IdempotentRepository<String> inProgressRepository) {
this.inProgressRepository = inProgressRepository;
}
public boolean isKeepLastModified() {
return keepLastModified;
}
public void setKeepLastModified(boolean keepLastModified) {
this.keepLastModified = keepLastModified;
}
/**
* Configures the given message with the file which sets the body to the
* file object.
*/
public void configureMessage(GenericFile<T> file, Message message) {
message.setBody(file);
if (flatten) {
// when flatten the file name should not contain any paths
message.setHeader(Exchange.FILE_NAME, file.getFileNameOnly());
} else {
// compute name to set on header that should be relative to starting directory
String name = file.isAbsolute() ? file.getAbsoluteFilePath() : file.getRelativeFilePath();
// skip leading endpoint configured directory
String endpointPath = getConfiguration().getDirectory() + getFileSeparator();
if (ObjectHelper.isNotEmpty(endpointPath) && name.startsWith(endpointPath)) {
name = ObjectHelper.after(name, endpointPath);
}
// adjust filename
message.setHeader(Exchange.FILE_NAME, name);
}
}
/**
* Set up the exchange properties with the options of the file endpoint
*
* @param exchange
*/
public void configureExchange(Exchange exchange) {
// Now we just set the charset property here
if (getCharset() != null) {
exchange.setProperty(Exchange.CHARSET_NAME, getCharset());
}
}
/**
* Strategy to configure the move or premove option based on a String input.
* <p/>
*
* @param expression the original string input
* @return configured string or the original if no modifications is needed
*/
protected String configureMoveOrPreMoveExpression(String expression) {
// if the expression already have ${ } placeholders then pass it unmodified
if (SimpleLanguage.hasStartToken(expression)) {
return expression;
}
// remove trailing slash
expression = FileUtil.stripTrailingSeparator(expression);
StringBuilder sb = new StringBuilder();
// if relative then insert start with the parent folder
if (!isAbsolute(expression)) {
sb.append("${file:parent}");
sb.append(getFileSeparator());
}
// insert the directory the end user provided
sb.append(expression);
// append only the filename (file:name can contain a relative path, so we must use onlyname)
sb.append(getFileSeparator());
sb.append("${file:onlyname}");
return sb.toString();
}
protected Map<String, Object> getParamsAsMap() {
Map<String, Object> params = new HashMap<String, Object>();
if (isNoop()) {
params.put("noop", Boolean.toString(true));
}
if (isDelete()) {
params.put("delete", Boolean.toString(true));
}
if (move != null) {
params.put("move", move);
}
if (moveFailed != null) {
params.put("moveFailed", moveFailed);
}
if (preMove != null) {
params.put("preMove", preMove);
}
if (exclusiveReadLockStrategy != null) {
params.put("exclusiveReadLockStrategy", exclusiveReadLockStrategy);
}
if (readLock != null) {
params.put("readLock", readLock);
}
if (readLockCheckInterval > 0) {
params.put("readLockCheckInterval", readLockCheckInterval);
}
if (readLockTimeout > 0) {
params.put("readLockTimeout", readLockTimeout);
}
return params;
}
private Expression createFileLanguageExpression(String expression) {
Language language;
// only use file language if the name is complex (eg. using $)
if (expression.contains("$")) {
language = getCamelContext().resolveLanguage("file");
} else {
language = getCamelContext().resolveLanguage("constant");
}
return language.createExpression(expression);
}
/**
* Creates the associated name of the done file based on the given file name.
* <p/>
* This method should only be invoked if a done filename property has been set on this endpoint.
*
* @param fileName the file name
* @return name of the associated done file name
*/
protected String createDoneFileName(String fileName) {
String pattern = getDoneFileName();
ObjectHelper.notEmpty(pattern, "doneFileName", pattern);
// we only support ${file:name} or ${file:name.noext} as dynamic placeholders for done files
String path = FileUtil.onlyPath(fileName);
String onlyName = FileUtil.stripPath(fileName);
pattern = pattern.replaceFirst("\\$\\{file:name\\}", onlyName);
pattern = pattern.replaceFirst("\\$simple\\{file:name\\}", onlyName);
pattern = pattern.replaceFirst("\\$\\{file:name.noext\\}", FileUtil.stripExt(onlyName));
pattern = pattern.replaceFirst("\\$simple\\{file:name.noext\\}", FileUtil.stripExt(onlyName));
// must be able to resolve all placeholders supported
if (SimpleLanguage.hasStartToken(pattern)) {
throw new ExpressionIllegalSyntaxException(fileName + ". Cannot resolve reminder: " + pattern);
}
String answer = pattern;
if (ObjectHelper.isNotEmpty(path) && ObjectHelper.isNotEmpty(pattern)) {
// done file must always be in same directory as the real file name
answer = path + File.separator + pattern;
}
if (getConfiguration().needToNormalize()) {
// must normalize path to cater for Windows and other OS
answer = FileUtil.normalizePath(answer);
}
return answer;
}
/**
* Is the given file a done file?
* <p/>
* This method should only be invoked if a done filename property has been set on this endpoint.
*
* @param fileName the file name
* @return <tt>true</tt> if its a done file, <tt>false</tt> otherwise
*/
protected boolean isDoneFile(String fileName) {
String pattern = getDoneFileName();
ObjectHelper.notEmpty(pattern, "doneFileName", pattern);
if (!SimpleLanguage.hasStartToken(pattern)) {
// no tokens, so just match names directly
return pattern.equals(fileName);
}
// the static part of the pattern, is that a prefix or suffix?
// its a prefix if ${ start token is not at the start of the pattern
boolean prefix = pattern.indexOf("${") > 0;
// remove dynamic parts of the pattern so we only got the static part left
pattern = pattern.replaceFirst("\\$\\{file:name\\}", "");
pattern = pattern.replaceFirst("\\$simple\\{file:name\\}", "");
pattern = pattern.replaceFirst("\\$\\{file:name.noext\\}", "");
pattern = pattern.replaceFirst("\\$simple\\{file:name.noext\\}", "");
// must be able to resolve all placeholders supported
if (SimpleLanguage.hasStartToken(pattern)) {
throw new ExpressionIllegalSyntaxException(fileName + ". Cannot resolve reminder: " + pattern);
}
if (prefix) {
return fileName.startsWith(pattern);
} else {
return fileName.endsWith(pattern);
}
}
}