blob: 8dad12b0899b40627c31a77b13358f3af2935f0a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openejb.resource.quartz;
import org.apache.openejb.loader.SystemInstance;
import org.apache.openejb.quartz.Job;
import org.apache.openejb.quartz.JobDataMap;
import org.apache.openejb.quartz.JobExecutionContext;
import org.apache.openejb.quartz.JobExecutionException;
import org.apache.openejb.quartz.Scheduler;
import org.apache.openejb.quartz.SchedulerException;
import org.apache.openejb.quartz.impl.StdSchedulerFactory;
import org.apache.openejb.quartz.listeners.SchedulerListenerSupport;
import org.apache.openejb.util.JavaSecurityManagers;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.xa.XAResource;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* @version $Rev$ $Date$
*/
public class QuartzResourceAdapter implements ResourceAdapter {
public static final String OPENEJB_QUARTZ_TIMEOUT = "openejb.quartz.timeout";
//Start and stop may be called from different threads so use atomics
private final AtomicReference<Throwable> ex = new AtomicReference<>();
private final AtomicReference<Scheduler> scheduler = new AtomicReference<>();
private final AtomicReference<BootstrapContext> bootstrapContext = new AtomicReference<>();
private final AtomicReference<Thread> startThread = new AtomicReference<>();
@Override
public void start(final BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
if (null != this.bootstrapContext.getAndSet(bootstrapContext)) {
Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").warning("QuartzResourceAdapter is already starting");
return;
}
final CountDownLatch signal = new CountDownLatch(1);
long timeout = SystemInstance.get().getOptions().get(QuartzResourceAdapter.OPENEJB_QUARTZ_TIMEOUT, 10000L);
if (timeout < 1000L) {
timeout = 1000L;
}
if (timeout > 60000L) {
timeout = 60000L;
}
//Allow org.apache.openejb.quartz.InterruptableJob implementors to be interrupted on shutdown
JavaSecurityManagers.setSystemProperty(StdSchedulerFactory.PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN
, JavaSecurityManagers.getSystemProperty(StdSchedulerFactory.PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN, "true"));
JavaSecurityManagers.setSystemProperty(StdSchedulerFactory.PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN_WITH_WAIT
, JavaSecurityManagers.getSystemProperty(StdSchedulerFactory.PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN_WITH_WAIT, "true"));
//Let the user enable this if they really want it
JavaSecurityManagers.setSystemProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK
, JavaSecurityManagers.getSystemProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true"));
JavaSecurityManagers.setSystemProperty("org.terracotta.quartz.skipUpdateCheck"
, JavaSecurityManagers.getSystemProperty("org.terracotta.quartz.skipUpdateCheck", "true"));
startThread.set(new Thread("Quartz Scheduler Start") {
@Override
public void run() {
try {
QuartzResourceAdapter.this.scheduler.set(StdSchedulerFactory.getDefaultScheduler());
} catch (final Exception e) {
QuartzResourceAdapter.this.ex.set(e);
return;
}
try {
QuartzResourceAdapter.this.scheduler.get().getListenerManager().addSchedulerListener(new SchedulerListenerSupport() {
@Override
public void schedulerStarted() {
signal.countDown();
}
});
QuartzResourceAdapter.this.scheduler.get().start();
} catch (final Throwable e) {
QuartzResourceAdapter.this.ex.set(e);
signal.countDown();
}
}
});
startThread.get().setDaemon(true);
startThread.get().start();
boolean started = false;
try {
started = signal.await(timeout, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
//Ignore
}
final Throwable exception = ex.get();
if (null != exception) {
final String err = "Error creating Quartz Scheduler";
Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").error(err, exception);
throw new ResourceAdapterInternalException(err, exception);
}
if (started) {
Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").info("Started Quartz Scheduler");
} else {
Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").warning("Failed to start Quartz Scheduler within defined timeout, status unknown");
}
}
public Scheduler getScheduler() {
return scheduler.get();
}
public BootstrapContext getBootstrapContext() {
return bootstrapContext.get();
}
@Override
public void stop() {
final Scheduler s = scheduler.getAndSet(null);
if (null != s) {
if (null != startThread.get()) {
startThread.get().interrupt();
}
long timeout = SystemInstance.get().getOptions().get(QuartzResourceAdapter.OPENEJB_QUARTZ_TIMEOUT, 10000L);
if (timeout < 1000L) {
timeout = 1000L;
}
if (timeout > 60000L) {
timeout = 60000L;
}
final CountDownLatch shutdownWait = new CountDownLatch(1);
Thread stopThread = new Thread("Quartz Scheduler Requested Stop") {
@Override
public void run() {
try {
s.getListenerManager().addSchedulerListener(new SchedulerListenerSupport() {
@Override
public void schedulerShutdown() {
shutdownWait.countDown();
}
});
//Shutdown, but give running jobs a chance to complete.
//User scheduled jobs should really implement InterruptableJob
s.shutdown(true);
} catch (final Throwable e) {
QuartzResourceAdapter.this.ex.set(e);
shutdownWait.countDown();
}
}
};
stopThread.setDaemon(true);
stopThread.start();
boolean stopped = false;
try {
stopped = shutdownWait.await(timeout, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
//Ignore
}
try {
if (!stopped || !s.isShutdown()) {
stopThread = new Thread("Quartz Scheduler Forced Stop") {
@Override
public void run() {
try {
//Force a shutdown without waiting for jobs to complete.
s.shutdown(false);
Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").warning("Forced Quartz stop - Jobs may be incomplete");
} catch (final Throwable e) {
QuartzResourceAdapter.this.ex.set(e);
}
}
};
stopThread.setDaemon(true);
stopThread.start();
try {
//Give the forced shutdown a chance to complete
stopThread.join(timeout);
} catch (final InterruptedException e) {
//Ignore
}
}
} catch (final Throwable e) {
ex.set(e);
}
}
this.bootstrapContext.set(null);
if (null != ex.get()) {
Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").warning("Error stopping Quartz Scheduler", ex.get());
} else {
Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources").info("Stopped Quartz Scheduler");
}
}
@Override
public void endpointActivation(final MessageEndpointFactory messageEndpointFactory, final ActivationSpec activationSpec) throws ResourceException {
final Scheduler s = scheduler.get();
if (null == s) {
throw new ResourceException("Quartz Scheduler is not available");
}
try {
final JobSpec spec = (JobSpec) activationSpec;
final MessageEndpoint endpoint = messageEndpointFactory.createEndpoint(null);
spec.setEndpoint(endpoint);
final Job job = (Job) endpoint;
final JobDataMap jobDataMap = spec.getDetail().getJobDataMap();
jobDataMap.put(Data.class.getName(), new Data(job));
s.scheduleJob(spec.getDetail(), spec.getTrigger());
} catch (final SchedulerException e) {
throw new ResourceException("Failed to schedule job", e);
}
}
@Override
public void endpointDeactivation(final MessageEndpointFactory messageEndpointFactory, final ActivationSpec activationSpec) {
final Scheduler s = scheduler.get();
if (null == s) {
throw new IllegalStateException("Quartz Scheduler is not available");
}
JobSpec spec = null;
try {
spec = (JobSpec) activationSpec;
s.deleteJob(spec.jobKey());
} catch (final SchedulerException e) {
throw new IllegalStateException("Failed to delete job", e);
} finally {
if (null != spec) {
spec.getEndpoint().release();
}
}
}
public static class JobEndpoint implements Job {
private static Method method;
@Override
public void execute(final JobExecutionContext execution) throws JobExecutionException {
MessageEndpoint endpoint = null;
JobExecutionException ex = null;
try {
final JobDataMap jobDataMap = execution.getJobDetail().getJobDataMap();
final Data data = Data.class.cast(jobDataMap.get(Data.class.getName()));
final Job job = data.job;
if (null == method) {
method = job.getClass().getMethod("execute", JobExecutionContext.class);
}
endpoint = (MessageEndpoint) job;
endpoint.beforeDelivery(method);
job.execute(execution);
} catch (final NoSuchMethodException e) {
throw new IllegalStateException(e);
} catch (final ResourceException e) {
ex = new JobExecutionException(e);
} catch (final Throwable t) {
ex = new JobExecutionException(new Exception(t));
} finally {
if (null != endpoint) {
try {
endpoint.afterDelivery();
} catch (final ResourceException e) {
ex = new JobExecutionException(e);
}
}
}
if (null != ex) {
throw ex;
}
}
}
/**
* A private inner class is used so the key and value are not publicly visible.
* This is standard OpenEJB practice for all "public storage" maps as it prevents
* outside code from becoming dependent on or tampering with the private data.
*/
private static final class Data {
private final Job job;
private Data(final Job job) {
this.job = job;
}
}
@Override
public XAResource[] getXAResources(final ActivationSpec[] activationSpecs) throws ResourceException {
return new XAResource[0];
}
}