blob: edb82796e7a0a504875c16a38ab780f1b6fca06d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.ra.inflow;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.jms.XAConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.ra.QpidResourceAdapter;
import org.apache.qpid.ra.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class QpidExceptionHandler implements ExceptionListener
{
private static final Logger _log = LoggerFactory.getLogger(QpidExceptionHandler.class);
public static final Method ONMESSAGE;
protected final MessageEndpointFactory _endpointFactory;
protected Connection _connection;
protected ConnectionFactory _factory;
protected Destination _destination;
protected final QpidResourceAdapter _ra;
protected final QpidActivationSpec _spec;
protected boolean _isDeliveryTransacted;
protected final AtomicBoolean _deliveryActive = new AtomicBoolean(false);
protected boolean _isTopic = false;
// Whether we are in the failure recovery loop
protected AtomicBoolean _inFailure = new AtomicBoolean(false);
//Whether or not we have completed activating
protected AtomicBoolean _activated = new AtomicBoolean(false);
static
{
try
{
ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
public abstract void setup() throws Exception;
public abstract void start() throws Exception;
public abstract void stop();
protected QpidExceptionHandler(QpidResourceAdapter ra,
QpidActivationSpec spec,
MessageEndpointFactory endpointFactory) throws ResourceException
{
this._ra = ra;
this._spec = spec;
this._endpointFactory = endpointFactory;
try
{
_isDeliveryTransacted = endpointFactory.isDeliveryTransacted(QpidActivation.ONMESSAGE);
}
catch (Exception e)
{
throw new ResourceException(e);
}
}
public void onException(JMSException e)
{
if(_activated.get())
{
handleFailure(e) ;
}
else
{
_log.warn("Received JMSException: " + e + " while endpoint was not activated.");
}
}
/**
* Handles any failure by trying to reconnect
*
* @param failure the reason for the failure
*/
public void handleFailure(Throwable failure)
{
if(doesNotExist(failure))
{
_log.info("awaiting topic/queue creation " + _spec.getDestination());
}
else
{
_log.warn("Failure in Qpid activation " + _spec, failure);
}
int reconnectCount = 0;
int setupAttempts = _spec.getSetupAttempts();
long setupInterval = _spec.getSetupInterval();
// Only enter the failure loop once
if (_inFailure.getAndSet(true))
return;
try
{
while (_deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts))
{
teardown();
try
{
Thread.sleep(setupInterval);
}
catch (InterruptedException e)
{
_log.debug("Interrupted trying to reconnect " + _spec, e);
break;
}
_log.info("Attempting to reconnect " + _spec);
try
{
setup();
_log.info("Reconnected with Qpid");
break;
}
catch (Throwable t)
{
if(doesNotExist(failure))
{
_log.info("awaiting topic/queue creation " + _spec.getDestination());
}
else
{
_log.error("Unable to reconnect " + _spec, t);
}
}
++reconnectCount;
}
}
finally
{
// Leaving failure recovery loop
_inFailure.set(false);
}
}
/**
* Check to see if the failure represents a missing endpoint
* @param failure The failure.
* @return true if it represents a missing endpoint, false otherwise
*/
protected boolean doesNotExist(final Throwable failure)
{
return (failure instanceof AMQException) && (((AMQException)failure).getErrorCode() == AMQConstant.NOT_FOUND) ;
}
protected boolean isXA()
{
return _isDeliveryTransacted && !_spec.isUseLocalTx();
}
protected void setupConnection() throws Exception
{
this._connection = (isXA()) ? ((XAConnectionFactory)_factory).createXAConnection() : _factory.createConnection();
}
protected synchronized void teardown()
{
_log.debug("Tearing down " + _spec);
try
{
if (_connection != null)
{
_connection.stop();
}
}
catch (Throwable t)
{
_log.debug("Error stopping connection " + Util.asString(_connection), t);
}
try
{
if (_connection != null)
{
_connection.close();
}
}
catch (Throwable t)
{
_log.debug("Error closing connection " + Util.asString(_connection), t);
}
if (_spec.isHasBeenUpdated())
{
_factory = null;
}
_log.debug("Tearing down complete " + this);
}
protected void setupCF() throws Exception
{
if (_spec.isHasBeenUpdated())
{
_factory = _ra.createAMQConnectionFactory(_spec);
}
else
{
_factory = _ra.getDefaultAMQConnectionFactory();
}
}
protected void setupDestination() throws Exception
{
String destinationName = _spec.getDestination();
String destinationTypeString = _spec.getDestinationType();
if (_spec.isUseJNDI())
{
Context ctx = new InitialContext();
_log.debug("Using context " + ctx.getEnvironment() + " for " + _spec);
if (_log.isTraceEnabled())
{
_log.trace("setupDestination(" + ctx + ")");
}
if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
{
_log.debug("Destination type defined as " + destinationTypeString);
Class<? extends Destination> destinationType;
if (Topic.class.getName().equals(destinationTypeString))
{
destinationType = Topic.class;
_isTopic = true;
}
else
{
destinationType = Queue.class;
}
_log.debug("Retrieving destination " + destinationName +
" of type " +
destinationType.getName());
_destination = Util.lookup(ctx, destinationName, destinationType);
}
else
{
_log.debug("Destination type not defined");
_log.debug("Retrieving destination " + destinationName +
" of type " +
Destination.class.getName());
_destination = Util.lookup(ctx, destinationName, AMQDestination.class);
_isTopic = !(_destination instanceof Queue) ;
}
}
else
{
_destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination(), false);
if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
{
_log.debug("Destination type defined as " + destinationTypeString);
final boolean match ;
if (Topic.class.getName().equals(destinationTypeString))
{
match = (_destination instanceof Topic) ;
_isTopic = true;
}
else
{
match = (_destination instanceof Queue) ;
}
if (!match)
{
throw new ClassCastException("Expected destination of type " + destinationTypeString + " but created destination " + _destination) ;
}
}
else
{
_isTopic = !(_destination instanceof Queue) ;
}
}
_log.debug("Got destination " + _destination + " from " + destinationName);
}
}