blob: a2cf79ec713d5bee7bdb4ef65e56991eb0614c6b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.timer;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The timer consumer.
*
* @version
*/
public class TimerConsumer extends DefaultConsumer {
private static final transient Logger LOG = LoggerFactory.getLogger(TimerConsumer.class);
private final TimerEndpoint endpoint;
private volatile TimerTask task;
public TimerConsumer(TimerEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
}
@Override
protected void doStart() throws Exception {
task = new TimerTask() {
// counter
private final AtomicLong counter = new AtomicLong();
@Override
public void run() {
try {
long count = counter.incrementAndGet();
boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount();
if (fire) {
sendTimerExchange(count);
} else {
// no need to fire anymore as we exceeded repeat count
LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount());
cancel();
}
} catch (Throwable e) {
// catch all to avoid the JVM closing the thread and not firing again
LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e);
}
}
};
Timer timer = endpoint.getTimer();
configureTask(task, timer);
}
@Override
protected void doStop() throws Exception {
if (task != null) {
task.cancel();
}
task = null;
}
protected void configureTask(TimerTask task, Timer timer) {
if (endpoint.isFixedRate()) {
if (endpoint.getTime() != null) {
timer.scheduleAtFixedRate(task, endpoint.getTime(), endpoint.getPeriod());
} else {
timer.scheduleAtFixedRate(task, endpoint.getDelay(), endpoint.getPeriod());
}
} else {
if (endpoint.getTime() != null) {
if (endpoint.getPeriod() > 0) {
timer.schedule(task, endpoint.getTime(), endpoint.getPeriod());
} else {
timer.schedule(task, endpoint.getTime());
}
} else {
if (endpoint.getPeriod() > 0) {
timer.schedule(task, endpoint.getDelay(), endpoint.getPeriod());
} else {
timer.schedule(task, endpoint.getDelay());
}
}
}
}
protected void sendTimerExchange(long counter) {
Exchange exchange = endpoint.createExchange();
exchange.setProperty(Exchange.TIMER_COUNTER, counter);
exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName());
exchange.setProperty(Exchange.TIMER_TIME, endpoint.getTime());
exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod());
Date now = new Date();
exchange.setProperty(Exchange.TIMER_FIRED_TIME, now);
// also set now on in header with same key as quartz to be consistent
exchange.getIn().setHeader("firedTime", now);
LOG.trace("Timer {} is firing #{} count", endpoint.getTimerName(), counter);
try {
getProcessor().process(exchange);
// log exception if an exception occurred and was not handled
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
}
} catch (Exception e) {
getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
}
}
}