blob: f525b8e056e7c5bbc2c4a89b00ff0815380a89cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicemix.ftp;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.concurrent.locks.Lock;
import javax.jbi.JBIException;
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.servicemix.common.DefaultComponent;
import org.apache.servicemix.common.ServiceUnit;
import org.apache.servicemix.common.endpoints.PollingEndpoint;
import org.apache.servicemix.components.util.DefaultFileMarshaler;
import org.apache.servicemix.components.util.FileMarshaler;
import org.apache.servicemix.locks.LockManager;
import org.apache.servicemix.locks.impl.SimpleLockManager;
/**
* A polling endpoint which looks for a file or files in a directory
* and sends the files into the JBI bus as messages, deleting the files
* by default when they are processed.
*
* @org.apache.xbean.XBean element="poller"
*
* @version $Revision: 468487 $
*/
public class FtpPollerEndpoint extends PollingEndpoint implements FtpEndpointType {
private FTPClientPool clientPool;
private FileFilter filter;
private boolean deleteFile = true;
private boolean recursive = true;
private boolean changeWorkingDirectory;
private FileMarshaler marshaler = new DefaultFileMarshaler();
private LockManager lockManager;
private QName targetOperation;
private URI uri;
public FtpPollerEndpoint() {
}
public FtpPollerEndpoint(ServiceUnit serviceUnit, QName service, String endpoint) {
super(serviceUnit, service, endpoint);
}
public FtpPollerEndpoint(DefaultComponent component, ServiceEndpoint endpoint) {
super(component, endpoint);
}
public void poll() throws Exception {
pollFileOrDirectory(getWorkingPath());
}
public void validate() throws DeploymentException {
super.validate();
if (uri == null && (getClientPool() == null || getClientPool().getHost() == null)) {
throw new DeploymentException("Property uri or clientPool.host must be configured");
}
if (uri != null && getClientPool() != null && getClientPool().getHost() != null) {
throw new DeploymentException("Properties uri and clientPool.host can not be configured at the same time");
}
if (changeWorkingDirectory && recursive) {
throw new DeploymentException("changeWorkingDirectory='true' can not be set when recursive='true'");
}
}
public void start() throws Exception {
if (lockManager == null) {
lockManager = createLockManager();
}
if (clientPool == null) {
clientPool = createClientPool();
}
if (uri != null) {
clientPool.setHost(uri.getHost());
clientPool.setPort(uri.getPort());
if (uri.getUserInfo() != null) {
String[] infos = uri.getUserInfo().split(":");
clientPool.setUsername(infos[0]);
if (infos.length > 1) {
clientPool.setPassword(infos[1]);
}
}
} else {
String str = "ftp://" + clientPool.getHost();
if (clientPool.getPort() >= 0) {
str += ":" + clientPool.getPort();
}
str += "/";
uri = new URI(str);
}
super.start();
}
protected LockManager createLockManager() {
return new SimpleLockManager();
}
private String getWorkingPath() {
return (uri != null && uri.getPath() != null) ? uri.getPath() : ".";
}
// Properties
//-------------------------------------------------------------------------
/**
* @return the clientPool
*/
public FTPClientPool getClientPool() {
return clientPool;
}
/**
* @param clientPool the clientPool to set
*/
public void setClientPool(FTPClientPool clientPool) {
this.clientPool = clientPool;
}
/**
* @return the uri
*/
public URI getUri() {
return uri;
}
/**
* @param uri the uri to set
*/
public void setUri(URI uri) {
this.uri = uri;
}
public FileFilter getFilter() {
return filter;
}
/**
* Sets the optional filter to choose which files to process
*/
public void setFilter(FileFilter filter) {
this.filter = filter;
}
/**
* Returns whether or not we should delete the file when its processed
*/
public boolean isDeleteFile() {
return deleteFile;
}
public void setDeleteFile(boolean deleteFile) {
this.deleteFile = deleteFile;
}
public boolean isRecursive() {
return recursive;
}
public void setRecursive(boolean recursive) {
this.recursive = recursive;
}
public FileMarshaler getMarshaler() {
return marshaler;
}
public void setMarshaler(FileMarshaler marshaler) {
this.marshaler = marshaler;
}
public QName getTargetOperation() { return targetOperation; }
public void setTargetOperation(QName targetOperation) { this.targetOperation = targetOperation; }
public void setChangeWorkingDirectory(boolean changeWorkingDirectory) {
this.changeWorkingDirectory = changeWorkingDirectory;
}
// Implementation methods
//-------------------------------------------------------------------------
protected void pollFileOrDirectory(String fileOrDirectory) throws Exception {
FTPClient ftp = borrowClient();
try {
logger.debug("Polling directory " + fileOrDirectory);
pollFileOrDirectory(ftp, fileOrDirectory, isRecursive());
} finally {
returnClient(ftp);
}
}
protected void pollFileOrDirectory(FTPClient ftp, String fileOrDirectory, boolean processDir) throws Exception {
FTPFile[] files = listFiles(ftp, fileOrDirectory);
for (int i = 0; i < files.length; i++) {
String name = files[i].getName();
if (".".equals(name) || "..".equals(name)) {
continue; // ignore "." and ".."
}
String file = fileOrDirectory + "/" + name;
// This is a file, process it
if (!files[i].isDirectory()) {
if (getFilter() == null || getFilter().accept(new File(file))) {
pollFile(file); // process the file
}
// Only process directories if processDir is true
} else if (processDir) {
if (logger.isDebugEnabled()) {
logger.debug("Polling directory " + file);
}
pollFileOrDirectory(ftp, file, isRecursive());
} else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping directory " + file);
}
}
}
}
private FTPFile[] listFiles(FTPClient ftp, String directory) throws IOException {
if (changeWorkingDirectory) {
ftp.changeWorkingDirectory(directory);
return ftp.listFiles("");
} else {
return ftp.listFiles(directory);
}
}
protected void pollFile(final String file) {
if (logger.isDebugEnabled()) {
logger.debug("Scheduling file " + file + " for processing");
}
getExecutor().execute(new Runnable() {
public void run() {
final Lock lock = lockManager.getLock(file);
if (lock.tryLock()) {
boolean unlock = true;
try {
unlock = processFileAndDelete(file);
} finally {
if (unlock) {
lock.unlock();
}
}
}
}
});
}
protected boolean processFileAndDelete(String file) {
FTPClient ftp = null;
boolean unlock = true;
try {
ftp = borrowClient();
if (logger.isDebugEnabled()) {
logger.debug("Processing file " + file);
}
if (ftp.listFiles(file).length > 0) {
// Process the file. If processing fails, an exception should be thrown.
processFile(ftp, file);
// Processing is successful
// We should not unlock until the file has been deleted
unlock = false;
if (isDeleteFile()) {
if (!ftp.deleteFile(file)) {
throw new IOException("Could not delete file " + file);
}
unlock = true;
}
} else {
//avoid processing files that have been deleted on the server
logger.debug("Skipping " + file + ": the file no longer exists on the server");
}
} catch (Exception e) {
logger.error("Failed to process file: " + file + ". Reason: " + e, e);
} finally {
returnClient(ftp);
}
return unlock;
}
protected void processFile(FTPClient ftp, String file) throws Exception {
InputStream in = ftp.retrieveFileStream(file);
InOnly exchange = getExchangeFactory().createInOnlyExchange();
configureExchangeTarget(exchange);
NormalizedMessage message = exchange.createMessage();
exchange.setInMessage(message);
if (getTargetOperation() != null) { exchange.setOperation(getTargetOperation()); }
marshaler.readMessage(exchange, message, in, file);
sendSync(exchange);
in.close();
ftp.completePendingCommand();
if (exchange.getStatus() == ExchangeStatus.ERROR) {
Exception e = exchange.getError();
if (e == null) {
e = new JBIException("Unkown error");
}
throw e;
}
}
public String getLocationURI() {
return uri.toString();
}
public void process(MessageExchange exchange) throws Exception {
// Do nothing. In our case, this method should never be called
// as we only send synchronous InOnly exchange
}
protected FTPClientPool createClientPool() throws Exception {
FTPClientPool pool = new FTPClientPool();
pool.afterPropertiesSet();
return pool;
}
protected FTPClient borrowClient() throws JBIException {
try {
return (FTPClient) getClientPool().borrowClient();
} catch (Exception e) {
throw new JBIException(e);
}
}
protected void returnClient(FTPClient client) {
if (client != null) {
try {
getClientPool().returnClient(client);
} catch (Exception e) {
logger.error("Failed to return client to pool: " + e, e);
}
}
}
}