blob: d49f681dd3c9e1a24b60d2c5eb54c63488097619 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.messaging.util.failover;
import org.apache.qpid.messaging.Address;
import org.apache.qpid.messaging.ConnectionException;
import org.apache.qpid.messaging.Message;
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Receiver;
import org.apache.qpid.messaging.Sender;
import org.apache.qpid.messaging.SessionException;
import org.apache.qpid.messaging.TransportFailureException;
import org.apache.qpid.messaging.internal.ConnectionEvent;
import org.apache.qpid.messaging.internal.ConnectionEventListener;
import org.apache.qpid.messaging.internal.ConnectionInternal;
import org.apache.qpid.messaging.internal.ReceiverInternal;
import org.apache.qpid.messaging.internal.SenderInternal;
import org.apache.qpid.messaging.internal.SessionInternal;
import org.apache.qpid.messaging.util.AbstractSessionDecorator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>A Decorator that adds failover and basic housekeeping tasks to a session.
* This class adds,
* <ol>
* <li>Failover support.</li>
* <li>Management of receivers and senders created by this session.</li>
* <li>State management.</li>
* <li>Exception handling.</li>
* </ol></p>
*
* <p><b>Exception Handling</b><br>
* This class will wrap each method call to it's delegate to handle error situations.
* First it will check if the session is already CLOSED or FAILOVER_IN_PROGRESS state.
* If latter it will wait until the Session is moved to OPENED, CLOSED or the timer expires.
* For the last two cases a SessionException will be thrown and the Session closed.</p>
*
* <p><b>TransportFailureException</b><br>
* This class intercepts TransportFailureExceptions and are passed onto the connection.
* The Session will be marked as FAILOVER_IN_PROGRESS and the "operation" will be
* blocked until the exception() on the Connection object returns. At this point
* the Session is either moved to OPENED or CLOSED.</p>
*
* <p><b>SessionException</b><br>
* For the time being, anytime a session exception is received, the session will be marked CLOSED.
* We need to revisit this.</p>
*
* <p><i> <b>Close() can be called by,</b>
* <ol>
* <li>The application (normal close).</li>
* <li>By the connection object, if close is called on it.(normal close)</li>
* <li>By the connection object, if failover was unsuccessful(error)</li>
* <li>By itself if it receives and exception (error).</li>
* </ol>
* </i></p>
*/
public class SessionFailoverDecorator extends AbstractSessionDecorator implements ConnectionEventListener
{
private static Logger _logger = LoggerFactory.getLogger(SessionFailoverDecorator.class);
public enum SessionState {OPENED, CLOSED, FAILOVER_IN_PROGRESS}
private SessionState _state = SessionState.OPENED;
private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000);
private SessionException _lastException;
private long _connSerialNumber = 0;
public SessionFailoverDecorator(ConnectionInternal conn, SessionInternal delegate)
{
super(conn,delegate);
synchronized(_connectionLock)
{
_connSerialNumber = conn.getSerialNumber();
}
}
@Override
public void close() throws MessagingException
{
synchronized(_connectionLock)
{
if (_state == SessionState.CLOSED)
{
throw new MessagingException("Session is already closed");
}
_state = SessionState.CLOSED;
super.close();
}
}
@Override
public void commit() throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
_delegate.commit();
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
commit();
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public void rollback() throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
_delegate.rollback();
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
rollback();
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public void acknowledge(boolean sync) throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
_delegate.acknowledge(sync);
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
acknowledge(sync);
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public void acknowledge(Message message, boolean sync)
throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
_delegate.acknowledge(message,sync);
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
acknowledge(message,sync);
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public void reject(Message message) throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
_delegate.reject(message);
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
reject(message);
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public void release(Message message) throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
_delegate.release(message);
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
release(message);
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public void sync(boolean block) throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
_delegate.sync(block);
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
sync(block);
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public int getReceivable() throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
return _delegate.getReceivable();
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
return getReceivable();
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public int getUnsettledAcks() throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
return _delegate.getUnsettledAcks();
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
return getUnsettledAcks();
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public Receiver nextReceiver(long timeout) throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
return _delegate.nextReceiver(timeout);
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
return nextReceiver(timeout);
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public Sender createSender(Address address) throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
SenderInternal sender = new SenderFailoverDecorator(this,
(SenderInternal) _delegate.createSender(address));
synchronized (_connectionLock)
{
_senders.add(sender);
}
return sender;
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
return createSender(address);
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public Sender createSender(String address) throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
SenderInternal sender = new SenderFailoverDecorator(this,
(SenderInternal) _delegate.createSender(address));
synchronized (_connectionLock)
{
_senders.add(sender);
}
return sender;
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
return createSender(address);
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public Receiver createReceiver(Address address) throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
ReceiverInternal receiver = new ReceiverFailoverDecorator(this,
(ReceiverInternal) _delegate.createReceiver(address));
synchronized (_connectionLock)
{
_receivers.add(receiver);
}
return receiver;
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
return createReceiver(address);
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public Receiver createReceiver(String address) throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot
try
{
ReceiverInternal receiver = new ReceiverFailoverDecorator(this,
(ReceiverInternal) _delegate.createReceiver(address));
synchronized (_connectionLock)
{
_receivers.add(receiver);
}
return receiver;
}
catch (TransportFailureException e)
{
failover(e,serialNumber);
return createReceiver(address);
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public void checkError() throws MessagingException
{
checkPreConditions();
long serialNumber = _connSerialNumber; // take a snapshot // check if we already have the info.
try
{
// Asking the delegate.
_delegate.checkError();
}
catch (TransportFailureException e)
{
failover(e,serialNumber); // will throw an exception
return;
}
catch (SessionException e)
{
throw handleSessionException(e);
}
}
@Override
public boolean isClosed()
{
if (_state == SessionState.OPENED)
{
return super.isClosed(); // ask the delegate to be sure.
}
else
{
return true;
}
}
@Override // From SessionInternal
public void exception(TransportFailureException e, long serialNumber)
{
try
{
failover((TransportFailureException)e, serialNumber);
}
catch (SessionException ex)
{
_lastException = ex;
}
}
public void exception(SessionException e)
{
handleSessionException(e);
}
@Override
public void recreate() throws MessagingException
{
synchronized (_connectionLock)
{
_connSerialNumber = _conn.getSerialNumber();
_delegate.recreate();
for (ReceiverInternal rec : _receivers)
{
rec.recreate();
}
for (SenderInternal sender : _senders)
{
sender.recreate();
}
_state = SessionState.OPENED;
}
}
@Override
public ConnectionInternal getConnectionInternal()
{
return _conn;
}
@Override //From ConnectionEventListener
public void exception(ConnectionException e)
{
// NOOP
}
@Override //From ConnectionEventListener
public void eventOccured(ConnectionEvent event)
{
synchronized (_connectionLock)
{
switch(event.getType())
{
case PRE_FAILOVER:
case CONNECTION_LOST:
_state = SessionState.FAILOVER_IN_PROGRESS;
break;
case RECONNCTED:
_state = SessionState.OPENED;
break;
case POST_FAILOVER:
try
{
if (_state != SessionState.OPENED)
{
close();
}
}
catch (MessagingException e)
{
_logger.warn("Exception when trying to close the session", e);
}
_connectionLock.notifyAll();
break;
default:
break; //ignore the rest
}
}
}
protected void failover(TransportFailureException e, long serialNumber) throws SessionException
{
synchronized (_connectionLock)
{
if (_connSerialNumber > serialNumber)
{
return; // ignore, we already have failed over.
}
_state = SessionState.FAILOVER_IN_PROGRESS;
_conn.exception(e, serialNumber); // This triggers failover.
waitForFailoverToComplete();
}
}
protected void checkPreConditions() throws SessionException
{
switch (_state)
{
case CLOSED:
throw new SessionException("Session is closed. You cannot invoke methods on a closed session",_lastException);
case FAILOVER_IN_PROGRESS:
waitForFailoverToComplete();
}
}
/**
* Session Exceptions will generally invalidate the Session.
* TODO this needs to be revisited again.
* A new session will need to be created in that case.
* @param e
* @throws MessagingException
*/
protected SessionException handleSessionException(SessionException e)
{
synchronized (_connectionLock)
{
try
{
_lastException = e;
close();
}
catch(MessagingException ex)
{
_logger.warn("Error when closing session : " + getName(), ex);
}
}
return new SessionException("Session has been closed",e);
}
protected void waitForFailoverToComplete() throws SessionException
{
synchronized (_connectionLock)
{
try
{
_connectionLock.wait(_failoverTimeout);
}
catch (InterruptedException e)
{
//ignore.
}
if (_state == SessionState.CLOSED)
{
throw new SessionException("Session is closed. Failover was unsuccesfull");
}
else if (_state == SessionState.FAILOVER_IN_PROGRESS)
{
closeInternal();
throw new SessionException("Session is closed. Failover did not complete on time");
}
}
}
/** Suppress Exceptions as */
private void closeInternal()
{
try
{
close();
}
catch (Exception e)
{
//ignore
}
}
}