| package org.apache.qpid.messaging.util.failover; |
| |
| import org.apache.qpid.messaging.ConnectionException; |
| import org.apache.qpid.messaging.Message; |
| import org.apache.qpid.messaging.MessagingException; |
| import org.apache.qpid.messaging.SenderException; |
| import org.apache.qpid.messaging.Session; |
| import org.apache.qpid.messaging.SessionException; |
| import org.apache.qpid.messaging.TransportFailureException; |
| import org.apache.qpid.messaging.internal.ConnectionEvent; |
| import org.apache.qpid.messaging.internal.ConnectionEventListener; |
| import org.apache.qpid.messaging.internal.SenderInternal; |
| import org.apache.qpid.messaging.internal.SessionInternal; |
| import org.apache.qpid.messaging.util.AbstractSenderDecorator; |
| import org.apache.qpid.messaging.util.failover.SessionFailoverDecorator.SessionState; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * <p>A Decorator that adds failover and basic housekeeping tasks to a Sender. |
| * This class adds, |
| * <ol> |
| * <li>Failover support.</li> |
| * <li>State management.</li> |
| * <li>Exception handling.</li> |
| * <li>Failover</li> |
| * </ol></p> |
| * |
| * <p><b>Exception Handling</b><br> |
| * This class will wrap each method call to it's delegate to handle error situations. |
| * First it will check if the Receiver is already CLOSED or FAILOVER_IN_PROGRESS state. |
| * If latter it will wait until the Sender is moved to OPENED, CLOSED or the timer expires. |
| * For the last two cases a SenderException will be thrown and the Sender closed.</p> |
| * |
| * <p><b>TransportFailureException</b><br> |
| * This class intercepts TransportFailureExceptions and are passed onto the session, |
| * via the exception() method, which in turn passes into the connection. |
| * The Sender will be marked as FAILOVER_IN_PROGRESS and the "operation" will be |
| * blocked until the exception() on the Session object returns. At this point |
| * the Sender is either moved to OPENED or CLOSED.</p> |
| * |
| * <p><b>SessionException</b><br> |
| * For the time being, anytime a session exception is received, the Sender will be marked CLOSED. |
| * We need to revisit this.</p> |
| * |
| * <p><i> <b>Close() can be called by,</b> |
| * <ol> |
| * <li>The application (normal close).</li> |
| * <li>By the session object, if close is called on it.(normal close)</li> |
| * <li>By the connection object, if close is called on it.(normal close)</li> |
| * <li>By the connection object, if failover was unsuccessful(error)</li> |
| * <li>By itself (via the session) if it receives and exception (error).</li> |
| * </ol> |
| * </i></p> |
| */ |
| public class SenderFailoverDecorator extends AbstractSenderDecorator implements ConnectionEventListener |
| { |
| private static Logger _logger = LoggerFactory.getLogger(SenderFailoverDecorator.class); |
| |
| public enum SenderState {OPENED, CLOSED, FAILOVER_IN_PROGRESS}; |
| |
| private SenderState _state = SenderState.OPENED; |
| private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000); |
| private SenderException _lastException; |
| private long _connSerialNumber = 0; |
| |
| public SenderFailoverDecorator(SessionInternal ssn, SenderInternal delegate) |
| { |
| super(ssn,delegate); |
| synchronized(_connectionLock) |
| { |
| _connSerialNumber = ssn.getConnectionInternal().getSerialNumber(); |
| } |
| } |
| |
| @Override |
| public void send(Message message, boolean sync) throws MessagingException |
| { |
| checkPreConditions(); |
| long serialNumber = _connSerialNumber; // take a snapshot |
| try |
| { |
| _delegate.send(message, sync); |
| } |
| catch (TransportFailureException e) |
| { |
| failover(e,serialNumber); |
| send(message, sync); |
| } |
| catch (SessionException e) |
| { |
| throw handleSessionException(e); |
| } |
| } |
| |
| @Override |
| public void close() throws MessagingException |
| { |
| synchronized (_connectionLock) |
| { |
| if (_state == SenderState.CLOSED) |
| { |
| throw new MessagingException("Sender is already closed"); |
| } |
| _state = SenderState.CLOSED; |
| super.close(); |
| } |
| } |
| |
| @Override |
| public void setCapacity(int capacity) throws MessagingException |
| { |
| checkPreConditions(); |
| long serialNumber = _connSerialNumber; // take a snapshot |
| try |
| { |
| _delegate.setCapacity(capacity); |
| } |
| catch (TransportFailureException e) |
| { |
| failover(e,serialNumber); |
| setCapacity(capacity); |
| } |
| catch (SessionException e) |
| { |
| throw handleSessionException(e); |
| } |
| } |
| |
| @Override |
| public int getCapacity() throws MessagingException |
| { |
| checkPreConditions(); |
| long serialNumber = _connSerialNumber; // take a snapshot |
| try |
| { |
| return _delegate.getCapacity(); |
| } |
| catch (TransportFailureException e) |
| { |
| failover(e,serialNumber); |
| return getCapacity(); |
| } |
| catch (SessionException e) |
| { |
| throw handleSessionException(e); |
| } |
| } |
| |
| @Override |
| public int getAvailable() throws MessagingException |
| { |
| checkPreConditions(); |
| long serialNumber = _connSerialNumber; // take a snapshot |
| try |
| { |
| return _delegate.getAvailable(); |
| } |
| catch (TransportFailureException e) |
| { |
| failover(e,serialNumber); |
| return getAvailable(); |
| } |
| catch (SessionException e) |
| { |
| throw handleSessionException(e); |
| } |
| } |
| |
| @Override |
| public int getUnsettled() throws MessagingException |
| { |
| checkPreConditions(); |
| long serialNumber = _connSerialNumber; // take a snapshot |
| try |
| { |
| return _delegate.getUnsettled(); |
| } |
| catch (TransportFailureException e) |
| { |
| failover(e,serialNumber); |
| return getUnsettled(); |
| } |
| catch (SessionException e) |
| { |
| throw handleSessionException(e); |
| } |
| } |
| |
| @Override |
| public boolean isClosed() throws MessagingException |
| { |
| return _state == SenderState.CLOSED; |
| } |
| |
| @Override |
| public String getName() throws MessagingException |
| { |
| checkPreConditions(); |
| return getName(); |
| } |
| |
| @Override |
| public Session getSession() throws MessagingException |
| { |
| checkPreConditions(); |
| _ssn.checkError(); |
| return _ssn; |
| } |
| |
| @Override |
| public void recreate() throws MessagingException |
| { |
| synchronized(_connectionLock) |
| { |
| _connSerialNumber = _ssn.getConnectionInternal().getSerialNumber(); |
| _delegate.recreate(); |
| _state = SenderState.OPENED; |
| } |
| } |
| |
| @Override |
| public void eventOccured(ConnectionEvent event) |
| { |
| synchronized (_connectionLock) |
| { |
| switch(event.getType()) |
| { |
| case PRE_FAILOVER: |
| case CONNECTION_LOST: |
| _state = SenderState.FAILOVER_IN_PROGRESS; |
| break; |
| case RECONNCTED: |
| _state = SenderState.OPENED; |
| break; |
| case POST_FAILOVER: |
| try |
| { |
| if (_state != SenderState.OPENED) |
| { |
| close(); |
| } |
| } |
| catch (MessagingException e) |
| { |
| _logger.warn("Exception when trying to close the Sender", e); |
| } |
| _connectionLock.notifyAll(); |
| break; |
| default: |
| break; //ignore the rest |
| } |
| } |
| } |
| |
| @Override // From ConnectionEventListener |
| public void exception(ConnectionException e) |
| {// NOOP |
| } |
| |
| protected void waitForFailoverToComplete() throws SenderException |
| { |
| synchronized (_connectionLock) |
| { |
| try |
| { |
| _connectionLock.wait(_failoverTimeout); |
| } |
| catch (InterruptedException e) |
| { |
| //ignore. |
| } |
| if (_state == SenderState.CLOSED) |
| { |
| throw new SenderException("Sender is closed. Failover was unsuccesfull",_lastException); |
| } |
| else if (_state == SenderState.FAILOVER_IN_PROGRESS) |
| { |
| closeInternal(); |
| throw new SenderException("Sender is closed. Failover did not complete on time"); |
| } |
| } |
| } |
| |
| protected void failover(TransportFailureException e, long serialNumber) throws SenderException |
| { |
| synchronized (_connectionLock) |
| { |
| if (_connSerialNumber > serialNumber) |
| { |
| return; // ignore, we already have failed over. |
| } |
| _state = SenderState.FAILOVER_IN_PROGRESS; |
| _ssn.exception(e, serialNumber); // This triggers failover. |
| waitForFailoverToComplete(); |
| } |
| } |
| |
| protected void checkPreConditions() throws SenderException |
| { |
| switch (_state) |
| { |
| case CLOSED: |
| throw new SenderException("Sender is closed. You cannot invoke methods on a closed sender",_lastException); |
| case FAILOVER_IN_PROGRESS: |
| waitForFailoverToComplete(); |
| } |
| } |
| |
| /** |
| * Session Exceptions will generally invalidate the Session. |
| * TODO this needs to be revisited again. |
| * A new session will need to be created in that case. |
| * @param e |
| * @throws MessagingException |
| */ |
| protected SenderException handleSessionException(SessionException e) |
| { |
| synchronized (_connectionLock) |
| { |
| // This should close all senders (including this) and Senders. |
| _ssn.exception(e); |
| } |
| return new SenderException("Session has been closed",e); |
| } |
| |
| /** Suppress Exceptions as */ |
| private void closeInternal() |
| { |
| try |
| { |
| close(); |
| } |
| catch (Exception e) |
| { |
| //ignore |
| } |
| } |
| } |