blob: 828eae0bfb3a3317998bcda5b7746491917255d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.file;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.PollingConsumerPollStrategy;
import org.apache.camel.support.EventDrivenPollingConsumer;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.StopWatch;
public class GenericFilePollingConsumer extends EventDrivenPollingConsumer {
private final long delay;
public GenericFilePollingConsumer(GenericFileEndpoint endpoint) throws Exception {
super(endpoint);
this.delay = endpoint.getDelay();
}
@Override
protected Consumer createConsumer() throws Exception {
// lets add ourselves as a consumer
GenericFileConsumer consumer = (GenericFileConsumer) super.createConsumer();
// do not start scheduler as we poll manually
consumer.setStartScheduler(false);
// when using polling consumer we poll only 1 file per poll so we can limit
consumer.setMaxMessagesPerPoll(1);
// however do not limit eager as we may sort the files and thus need to do a full scan so we can sort afterwards
consumer.setEagerLimitMaxMessagesPerPoll(false);
// we only want to poll once so disconnect by default
return consumer;
}
@Override
protected void doStart() throws Exception {
super.doStart();
// ensure consumer is started
ServiceHelper.startService(getConsumer());
}
@Override
protected void doStop() throws Exception {
super.doStop();
}
@Override
protected void doShutdown() throws Exception {
super.doShutdown();
}
@Override
protected GenericFileConsumer getConsumer() {
return (GenericFileConsumer) super.getConsumer();
}
@Override
public Exchange receiveNoWait() {
if (log.isTraceEnabled()) {
log.trace("receiveNoWait polling file: {}", getConsumer().getEndpoint());
}
int polled = doReceive(0);
if (polled > 0) {
return super.receive(0);
} else {
return null;
}
}
@Override
public Exchange receive() {
if (log.isTraceEnabled()) {
log.trace("receive polling file: {}", getConsumer().getEndpoint());
}
int polled = doReceive(Long.MAX_VALUE);
if (polled > 0) {
return super.receive();
} else {
return null;
}
}
@Override
public Exchange receive(long timeout) {
if (log.isTraceEnabled()) {
log.trace("receive({}) polling file: {}", timeout, getConsumer().getEndpoint());
}
int polled = doReceive(timeout);
if (polled > 0) {
return super.receive(timeout);
} else {
return null;
}
}
protected int doReceive(long timeout) {
int retryCounter = -1;
boolean done = false;
Throwable cause = null;
int polledMessages = 0;
PollingConsumerPollStrategy pollStrategy = getConsumer().getPollStrategy();
boolean sendEmptyMessageWhenIdle = getConsumer() instanceof ScheduledBatchPollingConsumer && getConsumer().isSendEmptyMessageWhenIdle();
StopWatch watch = new StopWatch();
while (!done) {
try {
cause = null;
// eager assume we are done
done = true;
if (isRunAllowed()) {
if (retryCounter == -1) {
log.trace("Starting to poll: {}", this.getEndpoint());
} else {
log.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint());
}
// mark we are polling which should also include the begin/poll/commit
boolean begin = pollStrategy.begin(getConsumer(), getEndpoint());
if (begin) {
retryCounter++;
polledMessages = getConsumer().poll();
log.trace("Polled {} messages", polledMessages);
if (polledMessages == 0 && sendEmptyMessageWhenIdle) {
// send an "empty" exchange
processEmptyMessage();
} else if (polledMessages == 0 && timeout > 0) {
// if we did not poll a file and we are using timeout then try to poll again
done = false;
}
pollStrategy.commit(getConsumer(), getEndpoint(), polledMessages);
} else {
log.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
}
}
log.trace("Finished polling: {}", this.getEndpoint());
} catch (Exception e) {
try {
boolean retry = pollStrategy.rollback(getConsumer(), getEndpoint(), retryCounter, e);
if (retry) {
// do not set cause as we retry
done = false;
} else {
cause = e;
done = true;
}
} catch (Throwable t) {
cause = t;
done = true;
}
} catch (Throwable t) {
cause = t;
done = true;
}
if (!done && timeout > 0) {
// prepare for next attempt until we hit timeout
long left = timeout - watch.taken();
long min = Math.min(left, delay);
if (min > 0) {
try {
// sleep for next pool
sleep(min);
} catch (InterruptedException e) {
// ignore
}
} else {
// timeout hit
done = true;
}
}
}
if (cause != null) {
throw RuntimeCamelException.wrapRuntimeCamelException(cause);
}
return polledMessages;
}
@Override
public void process(Exchange exchange) throws Exception {
Object name = exchange.getIn().getHeader(Exchange.FILE_NAME);
if (name != null) {
log.debug("Received file: {}", name);
}
super.process(exchange);
}
/**
* No messages to poll so send an empty message instead.
*
* @throws Exception is thrown if error processing the empty message.
*/
protected void processEmptyMessage() throws Exception {
Exchange exchange = getEndpoint().createExchange();
log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint());
process(exchange);
}
private void sleep(long delay) throws InterruptedException {
if (delay <= 0) {
return;
}
log.trace("Sleeping for: {} millis", delay);
Thread.sleep(delay);
}
}