| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.support; |
| |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.camel.Endpoint; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.FailedToCreateConsumerException; |
| import org.apache.camel.LoggingLevel; |
| import org.apache.camel.PollingConsumerPollingStrategy; |
| import org.apache.camel.Processor; |
| import org.apache.camel.Suspendable; |
| import org.apache.camel.spi.PollingConsumerPollStrategy; |
| import org.apache.camel.spi.ScheduledPollConsumerScheduler; |
| import org.apache.camel.support.service.ServiceHelper; |
| import org.apache.camel.util.ObjectHelper; |
| |
| /** |
| * A useful base class for any consumer which is polling based |
| */ |
| public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, Suspendable, PollingConsumerPollingStrategy { |
| |
| private ScheduledPollConsumerScheduler scheduler; |
| private ScheduledExecutorService scheduledExecutorService; |
| |
| // if adding more options then align with org.apache.camel.support.ScheduledPollEndpoint |
| |
| private boolean startScheduler = true; |
| private long initialDelay = 1000; |
| private long delay = 500; |
| private TimeUnit timeUnit = TimeUnit.MILLISECONDS; |
| private boolean useFixedDelay = true; |
| private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy(); |
| private LoggingLevel runLoggingLevel = LoggingLevel.TRACE; |
| private boolean sendEmptyMessageWhenIdle; |
| private boolean greedy; |
| private int backoffMultiplier; |
| private int backoffIdleThreshold; |
| private int backoffErrorThreshold; |
| private long repeatCount; |
| private Map<String, Object> schedulerProperties; |
| |
| // state during running |
| private volatile boolean polling; |
| private volatile int backoffCounter; |
| private volatile long idleCounter; |
| private volatile long errorCounter; |
| private final AtomicLong counter = new AtomicLong(); |
| |
| public ScheduledPollConsumer(Endpoint endpoint, Processor processor) { |
| super(endpoint, processor); |
| } |
| |
| public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService scheduledExecutorService) { |
| super(endpoint, processor); |
| // we have been given an existing thread pool, so we should not manage its lifecycle |
| // so we should keep shutdownExecutor as false |
| this.scheduledExecutorService = scheduledExecutorService; |
| ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService"); |
| } |
| |
| /** |
| * Invoked whenever we should be polled |
| */ |
| @Override |
| public void run() { |
| // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread |
| try { |
| // log starting |
| if (LoggingLevel.ERROR == runLoggingLevel) { |
| log.error("Scheduled task started on: {}", this.getEndpoint()); |
| } else if (LoggingLevel.WARN == runLoggingLevel) { |
| log.warn("Scheduled task started on: {}", this.getEndpoint()); |
| } else if (LoggingLevel.INFO == runLoggingLevel) { |
| log.info("Scheduled task started on: {}", this.getEndpoint()); |
| } else if (LoggingLevel.DEBUG == runLoggingLevel) { |
| log.debug("Scheduled task started on: {}", this.getEndpoint()); |
| } else { |
| log.trace("Scheduled task started on: {}", this.getEndpoint()); |
| } |
| |
| // execute scheduled task |
| doRun(); |
| |
| // log completed |
| if (LoggingLevel.ERROR == runLoggingLevel) { |
| log.error("Scheduled task completed on: {}", this.getEndpoint()); |
| } else if (LoggingLevel.WARN == runLoggingLevel) { |
| log.warn("Scheduled task completed on: {}", this.getEndpoint()); |
| } else if (LoggingLevel.INFO == runLoggingLevel) { |
| log.info("Scheduled task completed on: {}", this.getEndpoint()); |
| } else if (LoggingLevel.DEBUG == runLoggingLevel) { |
| log.debug("Scheduled task completed on: {}", this.getEndpoint()); |
| } else { |
| log.trace("Scheduled task completed on: {}", this.getEndpoint()); |
| } |
| |
| } catch (Error e) { |
| // must catch Error, to ensure the task is re-scheduled |
| log.error("Error occurred during running scheduled task on: " + this.getEndpoint() + ", due: " + e.getMessage(), e); |
| } |
| } |
| |
| private void doRun() { |
| if (isSuspended()) { |
| log.trace("Cannot start to poll: {} as its suspended", this.getEndpoint()); |
| return; |
| } |
| |
| // should we backoff if its enabled, and either the idle or error counter is > the threshold |
| if (backoffMultiplier > 0 |
| // either idle or error threshold could be not in use, so check for that and use MAX_VALUE if not in use |
| && (idleCounter >= (backoffIdleThreshold > 0 ? backoffIdleThreshold : Integer.MAX_VALUE)) |
| || errorCounter >= (backoffErrorThreshold > 0 ? backoffErrorThreshold : Integer.MAX_VALUE)) { |
| if (backoffCounter++ < backoffMultiplier) { |
| // yes we should backoff |
| if (idleCounter > 0) { |
| log.debug("doRun() backoff due subsequent {} idles (backoff at {}/{})", idleCounter, backoffCounter, backoffMultiplier); |
| } else { |
| log.debug("doRun() backoff due subsequent {} errors (backoff at {}/{})", errorCounter, backoffCounter, backoffMultiplier); |
| } |
| return; |
| } else { |
| // we are finished with backoff so reset counters |
| idleCounter = 0; |
| errorCounter = 0; |
| backoffCounter = 0; |
| log.trace("doRun() backoff finished, resetting counters."); |
| } |
| } |
| |
| long count = counter.incrementAndGet(); |
| boolean stopFire = repeatCount > 0 && count > repeatCount; |
| if (stopFire) { |
| log.debug("Cancelling {} scheduler as repeat count limit reached after {} counts.", getEndpoint(), repeatCount); |
| scheduler.unscheduleTask(); |
| return; |
| } |
| |
| int retryCounter = -1; |
| boolean done = false; |
| Throwable cause = null; |
| int polledMessages = 0; |
| |
| while (!done) { |
| try { |
| cause = null; |
| // eager assume we are done |
| done = true; |
| if (isPollAllowed()) { |
| |
| if (retryCounter == -1) { |
| log.trace("Starting to poll: {}", this.getEndpoint()); |
| } else { |
| log.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint()); |
| } |
| |
| // mark we are polling which should also include the begin/poll/commit |
| polling = true; |
| try { |
| boolean begin = pollStrategy.begin(this, getEndpoint()); |
| if (begin) { |
| retryCounter++; |
| polledMessages = poll(); |
| log.trace("Polled {} messages", polledMessages); |
| |
| if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) { |
| // send an "empty" exchange |
| processEmptyMessage(); |
| } |
| |
| pollStrategy.commit(this, getEndpoint(), polledMessages); |
| |
| if (polledMessages > 0 && isGreedy()) { |
| done = false; |
| retryCounter = -1; |
| log.trace("Greedy polling after processing {} messages", polledMessages); |
| } |
| } else { |
| log.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy); |
| } |
| } finally { |
| polling = false; |
| } |
| } |
| |
| log.trace("Finished polling: {}", this.getEndpoint()); |
| } catch (Exception e) { |
| try { |
| boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e); |
| if (retry) { |
| // do not set cause as we retry |
| done = false; |
| } else { |
| cause = e; |
| done = true; |
| } |
| } catch (Throwable t) { |
| cause = t; |
| done = true; |
| } |
| } catch (Throwable t) { |
| cause = t; |
| done = true; |
| } |
| |
| if (cause != null && isRunAllowed()) { |
| // let exception handler deal with the caused exception |
| // but suppress this during shutdown as the logs may get flooded with exceptions during shutdown/forced shutdown |
| try { |
| getExceptionHandler().handleException("Consumer " + this + " failed polling endpoint: " + getEndpoint() |
| + ". Will try again at next poll", cause); |
| } catch (Throwable e) { |
| log.warn("Error handling exception. This exception will be ignored.", e); |
| } |
| } |
| } |
| |
| if (cause != null) { |
| idleCounter = 0; |
| errorCounter++; |
| } else { |
| idleCounter = polledMessages == 0 ? ++idleCounter : 0; |
| errorCounter = 0; |
| } |
| log.trace("doRun() done with idleCounter={}, errorCounter={}", idleCounter, errorCounter); |
| |
| // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread |
| } |
| |
| /** |
| * No messages to poll so send an empty message instead. |
| * |
| * @throws Exception is thrown if error processing the empty message. |
| */ |
| protected void processEmptyMessage() throws Exception { |
| Exchange exchange = getEndpoint().createExchange(); |
| log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint()); |
| getProcessor().process(exchange); |
| } |
| |
| // Properties |
| // ------------------------------------------------------------------------- |
| |
| protected boolean isPollAllowed() { |
| return isRunAllowed() && !isSuspended(); |
| } |
| |
| /** |
| * Whether polling is currently in progress |
| */ |
| public boolean isPolling() { |
| return polling; |
| } |
| |
| public ScheduledPollConsumerScheduler getScheduler() { |
| return scheduler; |
| } |
| |
| public void setScheduler(ScheduledPollConsumerScheduler scheduler) { |
| this.scheduler = scheduler; |
| } |
| |
| public Map<String, Object> getSchedulerProperties() { |
| return schedulerProperties; |
| } |
| |
| public void setSchedulerProperties(Map<String, Object> schedulerProperties) { |
| this.schedulerProperties = schedulerProperties; |
| } |
| |
| public long getInitialDelay() { |
| return initialDelay; |
| } |
| |
| public void setInitialDelay(long initialDelay) { |
| this.initialDelay = initialDelay; |
| } |
| |
| public long getDelay() { |
| return delay; |
| } |
| |
| public void setDelay(long delay) { |
| this.delay = delay; |
| } |
| |
| public TimeUnit getTimeUnit() { |
| return timeUnit; |
| } |
| |
| public void setTimeUnit(TimeUnit timeUnit) { |
| this.timeUnit = timeUnit; |
| } |
| |
| public boolean isUseFixedDelay() { |
| return useFixedDelay; |
| } |
| |
| public void setUseFixedDelay(boolean useFixedDelay) { |
| this.useFixedDelay = useFixedDelay; |
| } |
| |
| public LoggingLevel getRunLoggingLevel() { |
| return runLoggingLevel; |
| } |
| |
| public void setRunLoggingLevel(LoggingLevel runLoggingLevel) { |
| this.runLoggingLevel = runLoggingLevel; |
| } |
| |
| public PollingConsumerPollStrategy getPollStrategy() { |
| return pollStrategy; |
| } |
| |
| public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) { |
| this.pollStrategy = pollStrategy; |
| } |
| |
| public boolean isStartScheduler() { |
| return startScheduler; |
| } |
| |
| public void setStartScheduler(boolean startScheduler) { |
| this.startScheduler = startScheduler; |
| } |
| |
| public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) { |
| this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle; |
| } |
| |
| public boolean isSendEmptyMessageWhenIdle() { |
| return sendEmptyMessageWhenIdle; |
| } |
| |
| public boolean isGreedy() { |
| return greedy; |
| } |
| |
| public void setGreedy(boolean greedy) { |
| this.greedy = greedy; |
| } |
| |
| public int getBackoffCounter() { |
| return backoffCounter; |
| } |
| |
| public int getBackoffMultiplier() { |
| return backoffMultiplier; |
| } |
| |
| public void setBackoffMultiplier(int backoffMultiplier) { |
| this.backoffMultiplier = backoffMultiplier; |
| } |
| |
| public int getBackoffIdleThreshold() { |
| return backoffIdleThreshold; |
| } |
| |
| public void setBackoffIdleThreshold(int backoffIdleThreshold) { |
| this.backoffIdleThreshold = backoffIdleThreshold; |
| } |
| |
| public int getBackoffErrorThreshold() { |
| return backoffErrorThreshold; |
| } |
| |
| public void setBackoffErrorThreshold(int backoffErrorThreshold) { |
| this.backoffErrorThreshold = backoffErrorThreshold; |
| } |
| |
| public long getRepeatCount() { |
| return repeatCount; |
| } |
| |
| public void setRepeatCount(long repeatCount) { |
| this.repeatCount = repeatCount; |
| } |
| |
| public ScheduledExecutorService getScheduledExecutorService() { |
| return scheduledExecutorService; |
| } |
| |
| public boolean isSchedulerStarted() { |
| return scheduler.isSchedulerStarted(); |
| } |
| |
| public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { |
| this.scheduledExecutorService = scheduledExecutorService; |
| } |
| |
| // Implementation methods |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * The polling method which is invoked periodically to poll this consumer |
| * |
| * @return number of messages polled, will be <tt>0</tt> if no message was polled at all. |
| * @throws Exception can be thrown if an exception occurred during polling |
| */ |
| protected abstract int poll() throws Exception; |
| |
| @Override |
| protected void doInit() throws Exception { |
| super.doInit(); |
| |
| // validate that if backoff multiplier is in use, the threshold values is set correctly |
| if (backoffMultiplier > 0) { |
| if (backoffIdleThreshold <= 0 && backoffErrorThreshold <= 0) { |
| throw new IllegalArgumentException("backoffIdleThreshold and/or backoffErrorThreshold must be configured to a positive value when using backoffMultiplier"); |
| } |
| log.debug("Using backoff[multiplier={}, idleThreshold={}, errorThreshold={}] on {}", backoffMultiplier, backoffIdleThreshold, backoffErrorThreshold, getEndpoint()); |
| } |
| |
| if (scheduler == null) { |
| DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(scheduledExecutorService); |
| scheduler.setDelay(delay); |
| scheduler.setInitialDelay(initialDelay); |
| scheduler.setTimeUnit(timeUnit); |
| scheduler.setUseFixedDelay(useFixedDelay); |
| this.scheduler = scheduler; |
| } |
| scheduler.setCamelContext(getEndpoint().getCamelContext()); |
| scheduler.onInit(this); |
| |
| // configure scheduler with options from this consumer |
| if (schedulerProperties != null && !schedulerProperties.isEmpty()) { |
| // need to use a copy in case the consumer is restarted so we keep the properties |
| Map<String, Object> copy = new LinkedHashMap<>(schedulerProperties); |
| PropertyBindingSupport.build().bind(getEndpoint().getCamelContext(), scheduler, copy); |
| if (copy.size() > 0) { |
| throw new FailedToCreateConsumerException(getEndpoint(), "There are " + copy.size() |
| + " scheduler parameters that couldn't be set on the endpoint." |
| + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint." |
| + " Unknown parameters=[" + copy + "]"); |
| } |
| } |
| |
| ObjectHelper.notNull(scheduler, "scheduler", this); |
| ObjectHelper.notNull(pollStrategy, "pollStrategy", this); |
| } |
| |
| @Override |
| protected void doStart() throws Exception { |
| super.doStart(); |
| |
| if (scheduler != null) { |
| scheduler.scheduleTask(this); |
| ServiceHelper.startService(scheduler); |
| |
| if (isStartScheduler()) { |
| startScheduler(); |
| } |
| } |
| } |
| |
| /** |
| * Starts the scheduler. |
| * <p/> |
| * If the scheduler is already started, then this is a noop method call. |
| */ |
| public void startScheduler() { |
| scheduler.startScheduler(); |
| } |
| |
| @Override |
| protected void doStop() throws Exception { |
| if (scheduler != null) { |
| scheduler.unscheduleTask(); |
| ServiceHelper.stopAndShutdownServices(scheduler); |
| } |
| |
| // clear counters |
| backoffCounter = 0; |
| idleCounter = 0; |
| errorCounter = 0; |
| counter.set(0); |
| |
| super.doStop(); |
| } |
| |
| @Override |
| protected void doShutdown() throws Exception { |
| ServiceHelper.stopAndShutdownServices(scheduler); |
| super.doShutdown(); |
| } |
| |
| @Override |
| protected void doSuspend() throws Exception { |
| // dont stop/cancel the future task since we just check in the run method |
| } |
| |
| @Override |
| public void onInit() throws Exception { |
| // make sure the scheduler is starting |
| startScheduler = true; |
| } |
| |
| @Override |
| public long beforePoll(long timeout) throws Exception { |
| log.trace("Before poll {}", getEndpoint()); |
| // resume or start our self |
| if (!ServiceHelper.resumeService(this)) { |
| ServiceHelper.startService(this); |
| } |
| |
| // ensure at least timeout is as long as one poll delay |
| return Math.max(timeout, getDelay()); |
| } |
| |
| @Override |
| public void afterPoll() throws Exception { |
| log.trace("After poll {}", getEndpoint()); |
| // suspend or stop our self |
| if (!ServiceHelper.suspendService(this)) { |
| ServiceHelper.stopService(this); |
| } |
| } |
| |
| } |