blob: effee7ca25def5f145e1106032076aa35d0103a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openejb.core.timer;
import org.apache.openejb.ApplicationException;
import org.apache.openejb.BeanContext;
import org.apache.openejb.InterfaceType;
import org.apache.openejb.OpenEJBException;
import org.apache.openejb.OpenEJBRuntimeException;
import org.apache.openejb.RpcContainer;
import org.apache.openejb.core.BaseContext;
import org.apache.openejb.core.timer.quartz.PatchedStdJDBCDelegate;
import org.apache.openejb.core.transaction.TransactionType;
import org.apache.openejb.loader.SystemInstance;
import org.apache.openejb.monitoring.LocalMBeanServer;
import org.apache.openejb.quartz.JobBuilder;
import org.apache.openejb.quartz.JobDataMap;
import org.apache.openejb.quartz.JobDetail;
import org.apache.openejb.quartz.Scheduler;
import org.apache.openejb.quartz.SchedulerException;
import org.apache.openejb.quartz.Trigger;
import org.apache.openejb.quartz.TriggerKey;
import org.apache.openejb.quartz.impl.StdSchedulerFactory;
import org.apache.openejb.quartz.impl.jdbcjobstore.JobStoreSupport;
import org.apache.openejb.quartz.impl.jdbcjobstore.StdJDBCDelegate;
import org.apache.openejb.quartz.impl.triggers.AbstractTrigger;
import org.apache.openejb.quartz.listeners.SchedulerListenerSupport;
import org.apache.openejb.quartz.simpl.RAMJobStore;
import org.apache.openejb.resource.quartz.QuartzResourceAdapter;
import org.apache.openejb.spi.ContainerSystem;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
import org.apache.openejb.util.SetAccessible;
import javax.ejb.EJBContext;
import javax.ejb.EJBException;
import javax.ejb.ScheduleExpression;
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class EjbTimerServiceImpl implements EjbTimerService, Serializable {
private static final long serialVersionUID = 1L;
private static final Logger log = Logger.getInstance(LogCategory.TIMER, "org.apache.openejb.util.resources");
public static final String QUARTZ_JMX = "org.apache.openejb.quartz.scheduler.jmx.export";
public static final String QUARTZ_MAKE_SCHEDULER_THREAD_DAEMON = "org.apache.openejb.quartz.scheduler.makeSchedulerThreadDaemon";
public static final String OPENEJB_TIMEOUT_JOB_NAME = "OPENEJB_TIMEOUT_JOB";
public static final String OPENEJB_TIMEOUT_JOB_GROUP_NAME = "OPENEJB_TIMEOUT_GROUP";
public static final String EJB_TIMER_RETRY_ATTEMPTS = "EjbTimer.RetryAttempts";
public static final String OPENEJB_QUARTZ_USE_TCCL = "openejb.quartz.use-TCCL";
private boolean transacted;
private int retryAttempts;
private transient TransactionManager transactionManager;
private transient BeanContext deployment;
private transient TimerStore timerStore;
private transient Scheduler scheduler;
public EjbTimerServiceImpl(final BeanContext deployment, final TimerStore timerStore) {
this(deployment, getDefaultTransactionManager(), timerStore, -1);
log.isDebugEnabled(); // touch logger to force it to be initialized
}
public static TransactionManager getDefaultTransactionManager() {
return SystemInstance.get().getComponent(TransactionManager.class);
}
public EjbTimerServiceImpl(final BeanContext deployment, final TransactionManager transactionManager, final TimerStore timerStore, final int retryAttempts) {
this.deployment = deployment;
this.transactionManager = transactionManager;
this.timerStore = timerStore;
final TransactionType transactionType = deployment.getTransactionType(deployment.getEjbTimeout());
this.transacted = transactionType == TransactionType.Required || transactionType == TransactionType.RequiresNew;
this.retryAttempts = retryAttempts;
if (retryAttempts < 0) {
this.retryAttempts = deployment.getOptions().get(EJB_TIMER_RETRY_ATTEMPTS, 1);
}
}
private void writeObject(final ObjectOutputStream out) throws IOException {
out.writeUTF(deployment.getDeploymentID().toString());
out.writeBoolean(transacted);
out.writeInt(retryAttempts);
}
private void readObject(final ObjectInputStream in) throws IOException {
final String dId = in.readUTF();
transacted = in.readBoolean();
retryAttempts = in.readInt();
deployment = SystemInstance.get().getComponent(ContainerSystem.class).getBeanContext(dId);
transactionManager = getDefaultTransactionManager();
timerStore = deployment.getEjbTimerService().getTimerStore();
scheduler = (Scheduler) Proxy.newProxyInstance(deployment.getClassLoader(), new Class<?>[]{Scheduler.class}, new LazyScheduler(deployment));
}
public static synchronized Scheduler getDefaultScheduler(final BeanContext deployment) {
Scheduler scheduler = deployment.get(Scheduler.class);
if (scheduler != null) {
boolean valid;
try {
valid = !scheduler.isShutdown();
} catch (final Exception ignored) {
valid = false;
}
if (valid) {
return scheduler;
}
}
Scheduler thisScheduler;
synchronized (deployment.getId()) { // should be done only once so no perf issues
scheduler = deployment.get(Scheduler.class);
if (scheduler != null) {
return scheduler;
}
final Properties properties = new Properties();
int quartzProps = 0;
quartzProps += putAll(properties, SystemInstance.get().getProperties());
quartzProps += putAll(properties, deployment.getModuleContext().getAppContext().getProperties());
quartzProps += putAll(properties, deployment.getModuleContext().getProperties());
quartzProps += putAll(properties, deployment.getProperties());
// custom config -> don't use default/global scheduler
// if one day we want to keep a global config for a global scheduler (SystemInstance.get().getProperties()) we'll need to manage resume/pause etc correctly by app
// since we have a scheduler by ejb today in such a case we don't need
final boolean newInstance = quartzProps > 0;
final SystemInstance systemInstance = SystemInstance.get();
scheduler = systemInstance.getComponent(Scheduler.class);
if (scheduler == null || newInstance) {
final boolean useTccl = "true".equalsIgnoreCase(properties.getProperty(OPENEJB_QUARTZ_USE_TCCL, "false"));
defaultQuartzConfiguration(properties, deployment, newInstance, useTccl);
try {
// start in container context to avoid thread leaks
final ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
if (useTccl) {
Thread.currentThread().setContextClassLoader(deployment.getClassLoader());
} else {
Thread.currentThread().setContextClassLoader(EjbTimerServiceImpl.class.getClassLoader());
}
try {
thisScheduler = new StdSchedulerFactory(properties).getScheduler();
thisScheduler.start();
} finally {
Thread.currentThread().setContextClassLoader(oldCl);
}
//durability is configured with true, which means that the job will be kept in the store even if no trigger is attached to it.
//Currently, all the EJB beans share with the same job instance
final JobDetail job = JobBuilder.newJob(EjbTimeoutJob.class)
.withIdentity(OPENEJB_TIMEOUT_JOB_NAME, OPENEJB_TIMEOUT_JOB_GROUP_NAME)
.storeDurably(true)
.requestRecovery(false)
.build();
thisScheduler.addJob(job, true);
} catch (final SchedulerException e) {
throw new OpenEJBRuntimeException("Fail to initialize the default scheduler", e);
}
if (!newInstance) {
systemInstance.setComponent(Scheduler.class, thisScheduler);
}
} else {
thisScheduler = scheduler;
}
deployment.set(Scheduler.class, thisScheduler);
}
return thisScheduler;
}
private static void defaultQuartzConfiguration(final Properties properties, final BeanContext deployment, final boolean newInstance, final boolean tccl) {
final String defaultThreadPool = DefaultTimerThreadPoolAdapter.class.getName();
if (!properties.containsKey(StdSchedulerFactory.PROP_THREAD_POOL_CLASS)) {
properties.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, defaultThreadPool);
}
if (!properties.containsKey(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME)) {
properties.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, "OpenEJB-TimerService-Scheduler");
}
if (!properties.containsKey("org.terracotta.quartz.skipUpdateCheck")) {
properties.put("org.terracotta.quartz.skipUpdateCheck", "true");
}
if (!properties.containsKey(StdSchedulerFactory.PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN)) {
properties.put(StdSchedulerFactory.PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN, "true");
}
if (!properties.containsKey(StdSchedulerFactory.PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN_WITH_WAIT)) {
properties.put(StdSchedulerFactory.PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN_WITH_WAIT, "true");
}
if (!properties.containsKey(QUARTZ_MAKE_SCHEDULER_THREAD_DAEMON)) {
properties.put(QUARTZ_MAKE_SCHEDULER_THREAD_DAEMON, "true");
}
if (!properties.containsKey(QUARTZ_JMX) && LocalMBeanServer.isJMXActive()) {
properties.put(QUARTZ_JMX, "true");
}
if (!properties.containsKey(StdSchedulerFactory.PROP_SCHED_INSTANCE_ID)) {
if (!newInstance) {
properties.setProperty(StdSchedulerFactory.PROP_SCHED_INSTANCE_ID, "OpenEJB");
} else {
properties.setProperty(StdSchedulerFactory.PROP_SCHED_INSTANCE_ID, deployment.getDeploymentID().toString());
}
}
if (!tccl) {
final String driverDelegate = properties.getProperty("org.apache.openejb.quartz.jobStore.driverDelegateClass");
if (driverDelegate != null && StdJDBCDelegate.class.getName().equals(driverDelegate)) {
properties.put("org.apache.openejb.quartz.jobStore.driverDelegateClass", PatchedStdJDBCDelegate.class.getName());
} else if (driverDelegate != null) {
log.info("You use " + driverDelegate + " driver delegate with quartz, ensure it doesn't use ObjectInputStream otherwise your custom TimerData can induce some issues");
}
// adding our custom persister
if (properties.containsKey("org.apache.openejb.quartz.jobStore.class") && !properties.containsKey("org.apache.openejb.quartz.jobStore.driverDelegateInitString")) {
try {
final Class<?> clazz = EjbTimerServiceImpl.class.getClassLoader().loadClass(properties.getProperty("org.apache.openejb.quartz.jobStore.class"));
if (JobStoreSupport.class.isAssignableFrom(clazz)) {
properties.put("org.apache.openejb.quartz.jobStore.driverDelegateInitString",
"triggerPersistenceDelegateClasses=" + EJBCronTriggerPersistenceDelegate.class.getName());
}
} catch (final Throwable th) {
// no-op
}
}
}
if (defaultThreadPool.equals(properties.get(StdSchedulerFactory.PROP_THREAD_POOL_CLASS))) {
if (properties.containsKey("org.apache.openejb.quartz.threadPool.threadCount")
&& !properties.containsKey(DefaultTimerThreadPoolAdapter.OPENEJB_TIMER_POOL_SIZE)) {
log.info("Found property 'org.apache.openejb.quartz.threadPool.threadCount' for default thread pool, please use '"
+ DefaultTimerThreadPoolAdapter.OPENEJB_TIMER_POOL_SIZE + "' instead");
properties.put(DefaultTimerThreadPoolAdapter.OPENEJB_TIMER_POOL_SIZE, properties.getProperty("org.apache.openejb.quartz.threadPool.threadCount"));
}
if (properties.containsKey("org.quartz.threadPool.threadCount")
&& !properties.containsKey(DefaultTimerThreadPoolAdapter.OPENEJB_TIMER_POOL_SIZE)) {
log.info("Found property 'org.quartz.threadPool.threadCount' for default thread pool, please use '"
+ DefaultTimerThreadPoolAdapter.OPENEJB_TIMER_POOL_SIZE + "' instead");
properties.put(DefaultTimerThreadPoolAdapter.OPENEJB_TIMER_POOL_SIZE, properties.getProperty("org.quartz.threadPool.threadCount"));
}
}
// to ensure we can shutdown correctly, default doesn't support such a configuration
if (!properties.getProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, RAMJobStore.class.getName()).equals(RAMJobStore.class.getName())) {
properties.put("org.apache.openejb.quartz.jobStore.makeThreadsDaemons", properties.getProperty("org.apache.openejb.quartz.jobStore.makeThreadsDaemon", "true"));
}
}
private static int putAll(final Properties a, final Properties b) {
int number = 0;
for (final Map.Entry<Object, Object> entry : b.entrySet()) {
final String key = entry.getKey().toString();
if (key.startsWith("org.quartz.")
|| key.startsWith("org.apache.openejb.quartz.")
|| key.startsWith("openejb.quartz.")
|| DefaultTimerThreadPoolAdapter.OPENEJB_TIMER_POOL_SIZE.equals(key)
|| "org.terracotta.quartz.skipUpdateCheck".equals(key)) {
number++;
}
final Object value = entry.getValue();
if (String.class.isInstance(value)) {
if (!key.startsWith("org.quartz")) {
a.put(key, value);
} else {
a.put("org.apache.openejb.quartz" + key.substring("org.quartz".length()), value);
}
}
}
return number;
}
@Override
public void stop() {
cleanTimerData();
shutdownMyScheduler();
}
private void cleanTimerData() {
if (timerStore == null || scheduler == null || deployment == null) {
return;
}
final Collection<TimerData> timerDatas = timerStore.getTimers(deployment.getDeploymentID().toString());
if (timerDatas == null) {
return;
}
for (final TimerData data : timerDatas) {
final Trigger trigger = data.getTrigger();
if (trigger == null) {
continue;
}
final TriggerKey key = trigger.getKey();
try {
data.stop();
} catch (final EJBException ignored) {
log.warning("An error occured deleting trigger '" + key + "' on bean " + deployment.getDeploymentID());
}
}
}
private void shutdownMyScheduler() {
if (scheduler == null) {
return;
}
boolean defaultScheduler = false;
final Scheduler ds = SystemInstance.get().getComponent(Scheduler.class);
try { // == is the faster way to test, we rely on name (key in quartz registry) only for serialization
defaultScheduler = ds == scheduler || scheduler.getSchedulerName().equals(ds.getSchedulerName());
} catch (final Exception e) {
// no-op: default should be fine
}
// if specific instance
if (!defaultScheduler) {
shutdown(scheduler);
}
}
public static void shutdown() {
shutdown(SystemInstance.get().getComponent(Scheduler.class));
}
private static void shutdown(final Scheduler s) throws OpenEJBRuntimeException {
try {
if (null != s && !s.isShutdown() && s.isStarted()) {
try {
s.pauseAll();
} catch (final SchedulerException e) {
// no-op
}
long timeout = SystemInstance.get().getOptions().get(QuartzResourceAdapter.OPENEJB_QUARTZ_TIMEOUT, 10000L);
if (timeout < 1000L) {
timeout = 1000L;
}
final CountDownLatch shutdownWait = new CountDownLatch(1);
final AtomicReference<Throwable> ex = new AtomicReference<>();
String n = "Unknown";
try {
n = s.getSchedulerName();
} catch (final SchedulerException e) {
log.warning("EjbTimerService scheduler has no name");
}
final String name = n;
Thread stopThread = new Thread(name + " shutdown wait") {
@Override
public void run() {
try {
s.getListenerManager().addSchedulerListener(new SchedulerListenerSupport() {
@Override
public void schedulerShutdown() {
shutdownWait.countDown();
}
});
//Shutdown, but give running jobs a chance to complete.
//User scheduled jobs should really implement InterruptableJob
s.shutdown(true);
} catch (final Throwable e) {
ex.set(e);
shutdownWait.countDown();
}
}
};
stopThread.setDaemon(true);
stopThread.start();
boolean stopped = false;
try {
stopped = shutdownWait.await(timeout, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
//Ignore
}
try {
if (!stopped || !s.isShutdown()) {
stopThread = new Thread(name + " shutdown forced") {
@Override
public void run() {
try {
//Force a shutdown without waiting for jobs to complete.
s.shutdown(false);
log.warning("Forced " + name + " shutdown - Jobs may be incomplete");
} catch (final Throwable e) {
ex.set(e);
}
}
};
stopThread.setDaemon(true);
stopThread.start();
try {
//Give the forced shutdown a chance to complete
stopThread.join(timeout);
} catch (final InterruptedException e) {
//Ignore
}
}
} catch (final Throwable e) {
ex.set(e);
}
final Throwable t = ex.get();
if (null != t) {
throw new OpenEJBRuntimeException("Unable to shutdown " + name + " scheduler", t);
}
}
} catch (final SchedulerException e) {
//Ignore - This can only be a shutdown issue that we have no control over.
}
}
@Override
public void start() throws TimerStoreException {
if (isStarted()) {
return;
}
scheduler = getDefaultScheduler(deployment);
// load saved timers
final Collection<TimerData> timerDatas = timerStore.loadTimers(this, (String) deployment.getDeploymentID());
// schedule the saved timers
for (final TimerData timerData : timerDatas) {
initializeNewTimer(timerData);
}
}
public TransactionManager getTransactionManager() {
return transactionManager;
}
/**
* Called from TimerData and start when a timer should be scheduled with the java.util.Timer.
*
* @param timerData the timer to schedule
*/
public void schedule(final TimerData timerData) throws TimerStoreException {
start();
if (scheduler == null) {
throw new TimerStoreException("Scheduler is not configured properly");
}
timerData.setScheduler(scheduler);
final Trigger trigger = timerData.getTrigger();
if (null == trigger) {
try {
if (!scheduler.isShutdown()) {
log.warning("Failed to schedule: " + timerData.getInfo());
}
} catch (final SchedulerException e) {
//Ignore
}
}
final AbstractTrigger<?> atrigger;
if (trigger instanceof AbstractTrigger) { // is the case
atrigger = (AbstractTrigger<?>) trigger;
atrigger.setJobName(OPENEJB_TIMEOUT_JOB_NAME);
atrigger.setJobGroup(OPENEJB_TIMEOUT_JOB_GROUP_NAME);
} else {
throw new OpenEJBRuntimeException("the trigger was not an AbstractTrigger - Should not be possible: " + trigger);
}
final JobDataMap triggerDataMap = trigger.getJobDataMap();
triggerDataMap.put(EjbTimeoutJob.EJB_TIMERS_SERVICE, this);
triggerDataMap.put(EjbTimeoutJob.TIMER_DATA, timerData);
try {
final TriggerKey triggerKey = new TriggerKey(atrigger.getName(), atrigger.getGroup());
if (!scheduler.checkExists(triggerKey)) {
scheduler.scheduleJob(trigger);
} else if (Trigger.TriggerState.PAUSED.equals(scheduler.getTriggerState(triggerKey))) { // redeployment
// more consistent in the semantic than a resume but resume would maybe be more relevant here
scheduler.unscheduleJob(triggerKey);
scheduler.scheduleJob(trigger);
}
} catch (final Exception e) {
//TODO Any other actions we could do ?
log.error("Could not schedule timer " + timerData, e);
}
}
/**
* Call back from TimerData and ejbTimeout when a timer has been cancelled (or is complete) and should be removed from stores.
*
* @param timerData the timer that was cancelled
*/
public void cancelled(final TimerData timerData) {
// make sure it was removed from the store
timerStore.removeTimer(timerData.getId());
}
/**
* Returns a timerData to the TimerStore, if a cancel() is rolled back.
*
* @param timerData the timer to be returned to the timer store
*/
public void addTimerData(final TimerData timerData) {
try {
timerStore.addTimerData(timerData);
} catch (final Exception e) {
log.warning("Could not add timer of type " + timerData.getType().name() + " due to " + e.getMessage());
}
}
@Override
public Timer getTimer(final long timerId) {
final TimerData timerData = timerStore.getTimer((String) deployment.getDeploymentID(), timerId);
if (timerData != null) {
return timerData.getTimer();
} else {
return null;
}
}
@Override
public Collection<Timer> getTimers(final Object primaryKey) throws IllegalStateException {
checkState();
final Collection<Timer> timers = new ArrayList<>();
for (final TimerData timerData : timerStore.getTimers((String) deployment.getDeploymentID())) {
// if (!CalendarTimerData.class.isInstance(timerData) || !CalendarTimerData.class.cast(timerData).isAutoCreated()) {
timers.add(timerData.getTimer());
// }
}
return timers;
}
@Override
public Timer createTimer(final Object primaryKey,
final Method timeoutMethod,
final long duration,
final TimerConfig timerConfig) throws IllegalArgumentException, IllegalStateException, EJBException {
if (duration < 0) {
throw new IllegalArgumentException("duration is negative: " + duration);
}
checkState();
final Date expiration = new Date(System.currentTimeMillis() + duration);
try {
final TimerData timerData = timerStore.createSingleActionTimer(this, (String) deployment.getDeploymentID(), primaryKey, timeoutMethod, expiration, timerConfig);
initializeNewTimer(timerData);
return timerData.getTimer();
} catch (final TimerStoreException e) {
throw new EJBException(e);
}
}
@Override
public Timer createTimer(final Object primaryKey,
final Method timeoutMethod,
final long initialDuration,
final long intervalDuration,
final TimerConfig timerConfig) throws IllegalArgumentException, IllegalStateException, EJBException {
if (initialDuration < 0) {
throw new IllegalArgumentException("initialDuration is negative: " + initialDuration);
}
if (intervalDuration < 0) {
throw new IllegalArgumentException("intervalDuration is negative: " + intervalDuration);
}
checkState();
final Date initialExpiration = new Date(System.currentTimeMillis() + initialDuration);
try {
final TimerData timerData = timerStore.createIntervalTimer(this,
(String) deployment.getDeploymentID(),
primaryKey,
timeoutMethod,
initialExpiration,
intervalDuration,
timerConfig);
initializeNewTimer(timerData);
return timerData.getTimer();
} catch (final TimerStoreException e) {
throw new EJBException(e);
}
}
@Override
public Timer createTimer(final Object primaryKey,
final Method timeoutMethod,
final Date expiration,
final TimerConfig timerConfig) throws IllegalArgumentException, IllegalStateException, EJBException {
if (expiration == null) {
throw new IllegalArgumentException("expiration is null");
}
if (expiration.getTime() < 0) {
throw new IllegalArgumentException("expiration is negative: " + expiration.getTime());
}
checkState();
try {
final TimerData timerData = timerStore.createSingleActionTimer(this, (String) deployment.getDeploymentID(), primaryKey, timeoutMethod, expiration, timerConfig);
initializeNewTimer(timerData);
return timerData.getTimer();
} catch (final TimerStoreException e) {
throw new EJBException(e);
}
}
@Override
public Timer createTimer(final Object primaryKey,
final Method timeoutMethod,
final Date initialExpiration,
final long intervalDuration,
final TimerConfig timerConfig) throws IllegalArgumentException, IllegalStateException, EJBException {
if (initialExpiration == null) {
throw new IllegalArgumentException("initialExpiration is null");
}
if (initialExpiration.getTime() < 0) {
throw new IllegalArgumentException("initialExpiration is negative: " + initialExpiration.getTime());
}
if (intervalDuration < 0) {
throw new IllegalArgumentException("intervalDuration is negative: " + intervalDuration);
}
checkState();
try {
final TimerData timerData = timerStore.createIntervalTimer(this,
(String) deployment.getDeploymentID(),
primaryKey,
timeoutMethod,
initialExpiration,
intervalDuration,
timerConfig);
initializeNewTimer(timerData);
return timerData.getTimer();
} catch (final TimerStoreException e) {
throw new EJBException(e);
}
}
@Override
public Timer createTimer(final Object primaryKey, final Method timeoutMethod, final ScheduleExpression scheduleExpression, final TimerConfig timerConfig) {
if (scheduleExpression == null) {
throw new IllegalArgumentException("scheduleExpression is null");
}
//TODO add more schedule expression validation logic ?
checkState();
try {
final TimerData timerData = timerStore.createCalendarTimer(this,
(String) deployment.getDeploymentID(),
primaryKey,
timeoutMethod,
scheduleExpression,
timerConfig,
false);
initializeNewTimer(timerData);
return timerData.getTimer();
} catch (final TimerStoreException e) {
throw new EJBException(e);
}
}
@Override
public TimerStore getTimerStore() {
return timerStore;
}
@Override
public boolean isStarted() {
return scheduler != null;
}
public Scheduler getScheduler() {
return scheduler;
}
private void initializeNewTimer(final TimerData timerData) {
// mark this as a new timer... when the transaction completes it will schedule the timer
timerData.newTimer();
}
/**
* Insure that timer methods can be invoked for the current operation on this Context.
*/
private void checkState() throws IllegalStateException {
final BaseContext context = (BaseContext) deployment.get(EJBContext.class);
context.doCheck(BaseContext.Call.timerMethod);
}
/**
* This method calls the ejbTimeout method and starts a transaction if the timeout is transacted.
*
* This method will retry failed ejbTimeout calls until retryAttempts is exceeded.
*
* @param timerData the timer to call.
*/
@SuppressWarnings("ReturnInsideFinallyBlock")
public void ejbTimeout(final TimerData timerData) {
final Thread thread = Thread.currentThread();
final ClassLoader loader = thread.getContextClassLoader(); // container loader
try {
Timer timer = getTimer(timerData.getId());
// quartz can be backed by some advanced config (jdbc for instance)
if (timer == null && timerStore instanceof MemoryTimerStore && timerData.getTimer() != null) {
try {
timerStore.addTimerData(timerData);
timer = timerData.getTimer(); // TODO: replace memoryjobstore by the db one?
} catch (final TimerStoreException e) {
// shouldn't occur
}
// return;
}
for (int tries = 0; tries < 1 + retryAttempts; tries++) {
boolean retry = false;
// if transacted, begin the transaction
if (transacted) {
try {
transactionManager.begin();
} catch (final Exception e) {
log.warning("Exception occurred while starting container transaction", e);
return;
}
}
// call the timeout method
try {
final RpcContainer container = (RpcContainer) deployment.getContainer();
if (container == null) {
return;
}
final Method ejbTimeout = timerData.getTimeoutMethod();
if (ejbTimeout == null) {
return;
}
// if app registered Synchronization we need it for commit()/rollback()
// so forcing it and not relying on container for it
thread.setContextClassLoader(deployment.getClassLoader() != null ? deployment.getClassLoader() : loader);
SetAccessible.on(ejbTimeout);
container.invoke(deployment.getDeploymentID(),
InterfaceType.TIMEOUT,
ejbTimeout.getDeclaringClass(),
ejbTimeout,
new Object[]{timer},
timerData.getPrimaryKey());
} catch (final RuntimeException e) {
retry = true;
// exception from a timer does not necessairly mean failure
log.warning("RuntimeException from ejbTimeout on " + deployment.getDeploymentID(), e);
try {
transactionManager.setRollbackOnly();
} catch (final SystemException e1) {
log.warning("Exception occured while setting RollbackOnly for container transaction", e1);
}
} catch (final OpenEJBException e) {
retry = true;
if (ApplicationException.class.isInstance(e)) { // we don't want to pollute logs
log.debug("Exception from ejbTimeout on " + deployment.getDeploymentID(), e);
} else {
log.warning("Exception from ejbTimeout on " + deployment.getDeploymentID(), e);
}
if (transacted) {
try {
transactionManager.setRollbackOnly();
} catch (final SystemException e1) {
log.warning("Exception occured while setting RollbackOnly for container transaction", e1);
}
}
} finally {
try {
if (!transacted) {
if (!retry) {
return;
}
} else if (transactionManager.getStatus() == Status.STATUS_ACTIVE) {
transactionManager.commit();
return;
} else {
// tx was marked rollback, so roll it back and retry.
transactionManager.rollback();
}
} catch (final Exception e) {
log.warning("Exception occured while completing container transaction", e);
}
}
}
log.warning("Failed to execute ejbTimeout on " + timerData.getDeploymentId() + " successfully within " + retryAttempts + " attempts");
} catch (final RuntimeException e) {
log.warning("RuntimeException occured while calling ejbTimeout", e);
throw e;
} catch (final Error e) {
log.warning("Error occured while calling ejbTimeout", e);
throw e;
} finally {
thread.setContextClassLoader(loader);
// clean up the timer store
//TODO shall we do all this via Quartz listener ???
if (timerData.getType() == TimerType.SingleAction) {
timerStore.removeTimer(timerData.getId());
timerData.setExpired(true);
} else if (timerData.getType() == TimerType.Calendar && timerData.getNextTimeout() == null) {
timerStore.removeTimer(timerData.getId());
timerData.setExpired(true);
} else {
timerStore.updateIntervalTimer(timerData);
}
}
}
/**
* The timer task registered with the java.util.Timer. The run method of this class
* simply adds an execution of the ejbTimeout method to the thread pool. It is
* important to use the thread pool, since the java.util.Timer is single threaded.
*/
/*private class EjbTimeoutTimerTask extends TimerTask {
private final TimerData timerData;
public EjbTimeoutTimerTask(TimerData timerData) {
this.timerData = timerData;
}
public void run() {
threadPool.execute(new Runnable() {
public void run() {
ejbTimeout(timerData);
}
});
}
}*/
private static class LazyScheduler implements InvocationHandler {
private final BeanContext ejb;
public LazyScheduler(final BeanContext deployment) {
ejb = deployment;
}
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
return method.invoke(getDefaultScheduler(ejb), args);
}
}
}