blob: c748ee2da38ce977b051cc093da8a2e5c98d6b87 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.synapse.message.processors;
import org.apache.synapse.SynapseException;
import org.apache.synapse.core.SynapseEnvironment;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import java.util.Map;
public abstract class ScheduledMessageProcessor extends AbstractMessageProcessor {
public static final String SCHEDULED_MESSAGE_PROCESSOR_GROUP =
"synapse.message.processor.quartz";
public static final String PROCESSOR_INSTANCE = "processor.instance";
/**
* The scheduler, run the the processor
*/
protected Scheduler scheduler = null;
/**
* The interval at which this processor runs , default value is 1000ms
*/
protected long interval = 1000;
protected enum State {
INITIALIZED,
START,
STOP,
DESTROY
}
/**
* The quartz configuration file if specified as a parameter
*/
protected String quartzConfig = null;
/**
* A cron expression to run the sampler
*/
protected String cronExpression = null;
/**
* Keep the state of the message processor
*/
protected State state = State.DESTROY;
public void start() {
Trigger trigger;
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger().
withIdentity(name + "-trigger-");
if (cronExpression == null || "".equals(cronExpression)) {
trigger = triggerBuilder.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInMilliseconds(interval)
.repeatForever())
.build();
} else {
trigger = triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
}
JobBuilder jobBuilder = getJobBuilder();
JobDataMap jobDataMap = getJobDataMap();
jobDataMap.put(MessageProcessorConstants.MESSAGE_STORE,
configuration.getMessageStore(messageStore));
jobDataMap.put(MessageProcessorConstants.PARAMETERS, parameters);
JobDetail jobDetail = jobBuilder.usingJobData(jobDataMap).build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
throw new SynapseException("Error scheduling job : " + jobDetail
+ " with trigger " + trigger ,e);
}
}
public void stop() {
if (state == State.START) {
try {
if (scheduler != null && scheduler.isStarted()) {
if (log.isDebugEnabled()) {
log.debug("ShuttingDown Message Processor Scheduler : " + scheduler.getMetaData());
}
scheduler.standby();
}
state = State.STOP;
} catch (SchedulerException e) {
throw new SynapseException("Error ShuttingDown Message processor scheduler ", e);
}
}
}
public void setParameters(Map<String, Object> parameters) {
super.setParameters(parameters);
if (parameters != null && !parameters.isEmpty()) {
Object o = parameters.get(MessageProcessorConstants.CRON_EXPRESSION);
if (o != null) {
cronExpression = o.toString();
}
o = parameters.get(MessageProcessorConstants.INTERVAL);
if (o != null) {
interval = Integer.parseInt(o.toString());
}
o = parameters.get(MessageProcessorConstants.QUARTZ_CONF);
if (o != null) {
quartzConfig = o.toString();
}
}
}
public void init(SynapseEnvironment se) {
super.init(se);
StdSchedulerFactory sf = new StdSchedulerFactory();
try {
if (quartzConfig != null && !"".equals(quartzConfig)) {
if (log.isDebugEnabled()) {
log.debug("Initiating a Scheduler with configuration : " + quartzConfig);
}
sf.initialize(quartzConfig);
}
} catch (SchedulerException e) {
throw new SynapseException("Error initiating scheduler factory "
+ sf + "with configuration loaded from " + quartzConfig, e);
}
try {
scheduler = sf.getScheduler();
} catch (SchedulerException e) {
throw new SynapseException("Error getting a scheduler instance form scheduler" +
" factory " + sf, e);
}
try {
scheduler.start();
state = State.INITIALIZED;
this.start();
} catch (SchedulerException e) {
throw new SynapseException("Error starting the scheduler", e);
}
}
protected abstract JobBuilder getJobBuilder();
protected JobDataMap getJobDataMap() {
return new JobDataMap();
}
public void destroy() {
try {
scheduler.deleteJob( new JobKey(name + "-trigger",SCHEDULED_MESSAGE_PROCESSOR_GROUP));
scheduler.shutdown(true);
} catch (SchedulerException e) {
log.error("Error while destroying the task " + e);
}
state = State.DESTROY;
}
}